diff --git a/convert/convert.cpp b/convert/convert.cpp index f5732cb..98d35ac 100644 --- a/convert/convert.cpp +++ b/convert/convert.cpp @@ -15,9 +15,9 @@ * where Index_Map is the address that the Index file mmapped into memory. * 3) Edge file (binary, .edge suffix), which stores all the outgoing edges according to the sequence of * index of vertices (i.e., the source vertex ID of the edges). Entries are tuples of the form: --------------------------------------------------------------------------------- -COMPACT | <4 byte dst, 4bytes weight> --------------------------------------------------------------------------------- + -------------------------------------------------------------------------------- + COMPACT | <4 byte dst, 4bytes weight> + -------------------------------------------------------------------------------- * Note: * 1) The first element of Edge file (array) is INTENTIONALLY left UNUSED! * This prevents the situation that the offset of some vertex is zero, @@ -39,7 +39,7 @@ COMPACT | <4 byte dst, 4bytes weight> #include "options_utils_convert.h" #include #include - +#include #include "convert.h" using namespace convert; //boost::program_options::options_description desc; @@ -61,9 +61,11 @@ int main( int argc, const char**argv) std::string out_dir, out_edge_file_name, out_index_file_name, out_desc_file_name, out_desc_file1_name; - //hejian-debug + //hejian-debug std::string snap_type; - std::string out_txt_file_name; + std::string out_txt_file_name; + + char* buffer; //setup options setup_options_convert( argc, argv ); @@ -79,36 +81,38 @@ int main( int argc, const char**argv) out_index_file_name = out_dir+ input_file_name +".index"; out_desc_file_name = out_dir+ input_file_name +".desc"; - out_txt_file_name = out_dir + input_file_name + "-type1.txt"; - - - std::string type1_or_type2 = vm["out-type"].as(); - std::string tmp_type1("type1"); - std::cout << type1_or_type2 << std::endl; - bool with_type1 = false; - unsigned int type1_type2 = 2; - //bool value 1 means type2, 0 means type1 - if (type1_or_type2.compare(tmp_type1) == 0) - { - std::cout << "type1 out edge will be generated!" << std::endl; - //this is type1, so need to add edge value - with_type1 = true; - type1_type2 = 1; - } - - bool with_in_edge = (bool)(vm["in-edge"].as()); - std::cout << with_in_edge << std::endl; - - if (with_in_edge) - { - std::cout << "in-edge will be generated!" <(); + std::string tmp_type1("type1"); + std::cout << type1_or_type2 << std::endl; + bool with_type1 = false; + unsigned int type1_type2 = 2; + //bool value 1 means type2, 0 means type1 + if (type1_or_type2.compare(tmp_type1) == 0) + { + std::cout << "type1 out edge will be generated!" << std::endl; + //this is type1, so need to add edge value + with_type1 = true; + type1_type2 = 1; + } + + bool with_in_edge = (bool)(vm["in-edge"].as()); + std::cout << with_in_edge << std::endl; + + //code changed for the need of sorting the disorder file + if (with_in_edge) + { + std::cout << "in-edge will be generated!" <(); @@ -123,14 +127,15 @@ int main( int argc, const char**argv) process_edgelist( input_graph_name.c_str(), out_edge_file_name.c_str(), out_index_file_name.c_str() , - out_txt_file_name.c_str(), - with_type1, with_in_edge); + out_txt_file_name.c_str(), + out_dir.c_str(), input_file_name.c_str(), + with_type1, with_in_edge); else if (snap_type == "adjlist" ) process_adjlist( input_graph_name.c_str(), out_edge_file_name.c_str(), out_index_file_name.c_str(), - out_txt_file_name.c_str(), - with_type1, with_in_edge); + out_txt_file_name.c_str(), + with_type1, with_in_edge); else{ std::cout << "input parameter (type) error!\n"; exit( -1 ); @@ -143,13 +148,16 @@ int main( int argc, const char**argv) desc_file << "max_vertex_id = " << max_vertex_id << "\n"; desc_file << "num_of_edges = " << num_edges << "\n"; desc_file << "max_out_edges = " << max_out_edges << "\n"; - desc_file << "edge_type = " << type1_type2 << "\n"; - desc_file << "with_in_edge = " << with_in_edge << "\n"; - desc_file.close(); - - //process in-edge - if (with_in_edge == true) - { - } + desc_file << "edge_type = " << type1_type2 << "\n"; + desc_file << "with_in_edge = " << with_in_edge << "\n"; + desc_file.close(); + + //process in-edge + if (with_in_edge == true) + { + } + + munlock( buffer, mem_size); + munmap( buffer, mem_size); } diff --git a/convert/k_way_merge.cpp b/convert/k_way_merge.cpp index dda1357..34c8993 100644 --- a/convert/k_way_merge.cpp +++ b/convert/k_way_merge.cpp @@ -35,471 +35,629 @@ u32_t edge_buffer_offset = 0; u32_t edge_suffix = 0; u32_t vert_suffix = 0; u32_t recent_src_vert = UINT_MAX; +u32_t buffer_offset = 0; u64_t tmp_num_edges = 0; +u64_t buf_index = 0; template class minheap { - T * nodes; - int max_size; - int curr_size; - public: - minheap(int max_size) - { - this->max_size = max_size; - this->nodes = (T*)calloc(this->max_size, sizeof(T)); - this->curr_size = 0; - } - ~minheap() - { - delete nodes; - } - - int parent(int i) - { - return (i+1)/2-1; - } - int left(int i) - { - return 2*i+1; - } - int right(int i) - { - return 2*i+2; - } - - void inc_size() - { - curr_size++; - assert(curr_size<=max_size); - } - void dec_size() - { - curr_size--; - assert(curr_size>=0); - } - bool isempty() - { - return curr_size==0; - } - - void insert(T element) - { - inc_size(); - int pos = curr_size-1; - for(; pos>0&&elementmax_size = max_size; + this->nodes = (T*)calloc(this->max_size, sizeof(T)); + this->curr_size = 0; + } + ~minheap() + { + delete nodes; + } + + int parent(int i) + { + return (i+1)/2-1; + } + int left(int i) + { + return 2*i+1; + } + int right(int i) + { + return 2*i+2; + } + + void inc_size() + { + curr_size++; + assert(curr_size<=max_size); + } + void dec_size() + { + curr_size--; + assert(curr_size>=0); + } + bool isempty() + { + return curr_size==0; + } + + void insert(T element) + { + inc_size(); + int pos = curr_size-1; + for(; pos>0&&elementdst = " << src_vert << "->" << dest_vert << std::endl; - tmp_num_edges ++; - //set the type2_edge_buffer - edge_suffix = tmp_num_edges - (edge_buffer_offset * EDGE_BUFFER_LEN); - in_edge_buffer[edge_suffix].in_vert = src_vert; - //write back if necessary - if (edge_suffix == (EDGE_BUFFER_LEN - 1)) - { - flush_buffer_to_file( in_edge_file, (char*)in_edge_buffer, - EDGE_BUFFER_LEN*sizeof(struct in_edge) ); - memset( (char*)in_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct in_edge) ); - edge_buffer_offset += 1; - } - - //fprintf(tmp_txt_file, "%d\t%d\t\n", value.src_vert, value.dest_vert); - - //is source vertex id continuous? - if (dest_vert != recent_src_vert) - { - if (dest_vert >= (vert_buffer_offset + 1)*VERT_BUFFER_LEN) - { - vert_buffer_offset += 1; - flush_buffer_to_file( in_vert_index_file, (char*)in_vert_buffer, - VERT_BUFFER_LEN*sizeof(struct vert_index) ); - memset( (char*)in_vert_buffer , 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); - } - vert_suffix = dest_vert - vert_buffer_offset * VERT_BUFFER_LEN; - in_vert_buffer[vert_suffix].offset = tmp_num_edges; - recent_src_vert = dest_vert; - } - //debug end - buf_idx++; - if(buf_idx<=buf_edges) - { - //buffer[buf_idx] = value; - - //lvhuiming debug - if(value.dest_vert != last_one) - { - last_one = value.dest_vert; - //std::cout<dst = " << src_vert << "->" << dest_vert << std::endl; + //asure the first number in in_edge file is 0 + tmp_num_edges ++; + //set the type2_edge_buffer + edge_suffix = tmp_num_edges - (edge_buffer_offset * EDGE_BUFFER_LEN); + in_edge_buffer[edge_suffix].in_vert = src_vert; + //write back if necessary + if (edge_suffix == (EDGE_BUFFER_LEN - 1)) + { + flush_buffer_to_file( in_edge_file, (char*)in_edge_buffer, + EDGE_BUFFER_LEN*sizeof(struct in_edge) ); + memset( (char*)in_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct in_edge) ); + edge_buffer_offset += 1; + } + + //fprintf(tmp_txt_file, "%d\t%d\t\n", value.src_vert, value.dest_vert); + + //is source vertex id continuous? + if (dest_vert != recent_src_vert) + { + if (dest_vert >= (vert_buffer_offset + 1)*VERT_BUFFER_LEN) + { + vert_buffer_offset += 1; + flush_buffer_to_file( in_vert_index_file, (char*)in_vert_buffer, + VERT_BUFFER_LEN*sizeof(struct vert_index) ); + memset( (char*)in_vert_buffer , 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); + } + vert_suffix = dest_vert - vert_buffer_offset * VERT_BUFFER_LEN; + in_vert_buffer[vert_suffix].offset = tmp_num_edges; + recent_src_vert = dest_vert; + } + //debug end + buf_idx++; + if(buf_idx<=buf_edges) + { + //buffer[buf_idx] = value; + + //lvhuiming debug + if(value.dest_vert != last_one) + { + last_one = value.dest_vert; + //std::cout<value.dest_vert < obj2.value.dest_vert); - } + int source_id; + tmp_in_edge value; + value_source(int id, tmp_in_edge val) : source_id(id), value(val){} + bool operator< (value_source &obj2) + { + return (this->value.dest_vert < obj2.value.dest_vert); + } }; class kway_merge { - std::vector sources; - merge_sink * sink; - minheap m_heap; - int merge_num; - public: - kway_merge(std::vector sources, merge_sink* sink) : sources(sources),sink(sink),m_heap(int(sources.size())) - { - this->merge_num = int(sources.size()); - } - - ~kway_merge() - { - sink = NULL; - } - - void merge() - { - int active_sources = (int)sources.size(); - for(int i=0; iget_next())); - } - - while(active_sources>0 || !m_heap.isempty()) - { - value_source v = m_heap.get_min(); - m_heap.pop_min(); - //std::cout<<"pop_min: "<has_more()) - { - m_heap.insert(value_source(v.source_id, sources[v.source_id]->get_next())); - } - else - { - active_sources--; - } - sink->Add(v.value); - } - sink->finish(); - } + std::vector sources; + merge_sink * sink; + minheap m_heap; + int merge_num; + public: + kway_merge(std::vector sources, merge_sink* sink) : sources(sources),sink(sink),m_heap(int(sources.size())) + { + this->merge_num = int(sources.size()); + } + + ~kway_merge() + { + sink = NULL; + } + + void merge() + { + int active_sources = (int)sources.size(); + for(int i=0; iget_next())); + } + + while(active_sources>0 || !m_heap.isempty()) + { + value_source v = m_heap.get_min(); + m_heap.pop_min(); + //std::cout<<"pop_min: "<has_more()) + { + m_heap.insert(value_source(v.source_id, sources[v.source_id]->get_next())); + } + else + { + active_sources--; + } + sink->Add(v.value); + } + sink->finish(); + } }; void do_merge() { - /* - while(0 != (mem_size % sizeof(tmp_in_edge))) - { - mem_size--; - } - */ - //exit(-1); - unsigned long long source_buf_size = ((mem_size/num_tmp_files)/sizeof(tmp_in_edge)) * sizeof(tmp_in_edge); - unsigned long long buf_edges = source_buf_size/sizeof(tmp_in_edge); - //std::cout<<"source_buf_size:"< sources; - for (unsigned int i = 0; i < num_tmp_files; i++) - { - //std::cout<<"source_"<::iterator iter; - for (iter=sources.begin(); iter!=sources.end(); iter++) - { - all_idx += (*iter)->idx; - } - std::cout<<"all in_file have a sum of "<idx<<" edges."< sources; + for (unsigned int i = 0; i < num_tmp_files; i++) + { + //std::cout<<"source_"<::iterator iter; + for (iter=sources.begin(); iter!=sources.end(); iter++) + { + all_idx += (*iter)->idx; + } + std::cout<<"all in_file have a sum of "<idx<<" edges."<value.src_vert < obj2.value.src_vert); + } +}; + +class src_kway_merge +{ + std::vector sources; + src_merge_sink * sink; + minheap m_heap; + int merge_num; + public: + src_kway_merge(std::vector sources, src_merge_sink* sink) : sources(sources),sink(sink),m_heap(int(sources.size())) + { + this->merge_num = int(sources.size()); + } + + ~src_kway_merge() + { + sink = NULL; + } + + void merge() + { + int active_sources = (int)sources.size(); + std::cout << "active_sources is " << active_sources << std::endl; + for(int i=0; iget_next())); + } + + while(active_sources>0 || !m_heap.isempty()) + { + src_value_source v = m_heap.get_min(); + m_heap.pop_min(); + if(sources[v.source_id]->has_more()) + { + m_heap.insert(src_value_source(v.source_id, sources[v.source_id]->get_next())); + } + else + { + active_sources--; + } + sink->Add(v.value); + } + sink->finish(); + } +}; + +void do_src_merge(char *tmp_out_dir, char *origin_edge_file) +{ + //buf1 save src data, buf2 save sorted data, so divided by 2 + unsigned long long source_buf_size = ((mem_size/2/num_tmp_files)/sizeof(tmp_in_edge)) * sizeof(tmp_in_edge); + unsigned long long buf_edges = source_buf_size/sizeof(tmp_in_edge); + + std::vector sources; + for (unsigned int i = 0; i < num_tmp_files; i++) + { + std::stringstream current_file_id; + current_file_id << i; + std::string current_file_name = std::string(prev_name_tmp_file) + current_file_id.str(); + tmp_in_edge * buf = (tmp_in_edge *)((char *)buf1 + i*source_buf_size); + sources.push_back(new merge_source(buf, buf_edges, current_file_name, file_len[i]) ); + } + /*init buf2*/ + memset( (char*)buf2, 0, each_buf_size*sizeof(struct tmp_in_edge)); + + std::string tmp_file_name(tmp_out_dir) ; + tmp_file_name += origin_edge_file; + tmp_file_name += "_sorted.txt"; + src_merge_sink* sink = new src_merge_sink(tmp_file_name); + + src_kway_merge k_merger(sources, sink); + k_merger.merge(); +//lilang test + std::cout << prev_name_tmp_file << std::endl; + for (unsigned int i = 0; i < num_tmp_files; i++) + { + std::stringstream delete_current_file_id; + delete_current_file_id << i; + std::string delete_current_file_name = std::string(prev_name_tmp_file) + delete_current_file_id.str(); + + std::cout << "delete tmp file " << delete_current_file_name << std::endl; + char tmp[1024]; + sprintf(tmp,"rm -rf %s", delete_current_file_name.c_str()); + int ret = system(tmp); + if (ret < 0) + assert(false); + } +} + diff --git a/convert/process_edgelist.cpp b/convert/process_edgelist.cpp index 65b28f7..01f4602 100644 --- a/convert/process_edgelist.cpp +++ b/convert/process_edgelist.cpp @@ -55,21 +55,24 @@ struct type2_edge type2_edge_buffer[EDGE_BUFFER_LEN]; void process_edgelist( const char* input_file_name, const char* edge_file_name, const char* vert_index_file_name, - const char * out_txt_file_name, - bool with_type1, - bool with_in_edge) + const char* out_txt_file_name, + const char* out_dir, + const char* origin_edge_file, + bool with_type1, + bool with_in_edge) { unsigned int recent_src_vert=UINT_MAX; unsigned int vert_buffer_offset=0; unsigned int edge_buffer_offset=0; unsigned int edge_suffix=0; unsigned int vert_suffix=0; - unsigned long long prev_out = 0; - - printf( "Start Processing %s.\nWill generate %s and %s in destination folder.\n", - input_file_name, edge_file_name, vert_index_file_name ); + unsigned long long prev_out = 0; + bool sort_needed = false;// judge the file that is right order - srand((unsigned int)time(NULL)); + printf( "Start Processing %s.\nWill generate %s and %s in destination folder.\n", + input_file_name, edge_file_name, vert_index_file_name ); + + srand((unsigned int)time(NULL)); in = fopen( input_file_name, "r" ); if( in == NULL ){ @@ -77,24 +80,24 @@ void process_edgelist( const char* input_file_name, exit(1); } - //out_txt = fopen(out_txt_file_name, "wt+"); - //if (out_txt == NULL) - // { - // printf("Cannot open the output txt file!\n"); - // exit(-1); - // } + //out_txt = fopen(out_txt_file_name, "wt+"); + //if (out_txt == NULL) + // { + // printf("Cannot open the output txt file!\n"); + // exit(-1); + // } edge_file = open( edge_file_name, O_CREAT|O_WRONLY, S_IRUSR ); if( edge_file == -1 ){ printf( "Cannot create edge list file:%s\nAborted..\n", - edge_file_name ); + edge_file_name ); exit( -1 ); } - + vert_index_file = open( vert_index_file_name, O_CREAT|O_WRONLY, S_IRUSR ); if( vert_index_file == -1 ){ printf( "Cannot create vertex index file:%s\nAborted..\n", - vert_index_file_name ); + vert_index_file_name ); exit( -1 ); } @@ -102,93 +105,165 @@ void process_edgelist( const char* input_file_name, memset( (char*)type2_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct type2_edge) ); memset( (char*)vert_buffer, 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); + //judge the file that is right order + while(read_one_edge() != CUSTOM_EOF ){ + //jump the ## + if ( num_edges == 0 ) + continue; + else if ( num_edges == 1 ) + recent_src_vert = src_vert; + else if ( recent_src_vert > src_vert ) { + sort_needed = true; + break; + } + else + recent_src_vert = src_vert; + } + + //init the global variable + line_no = 0; + num_edges = 0; + + //init the file pointer to the head of the file + fseek( in , 0 , SEEK_SET ); + + //sort the source + if( sort_needed != true ) + printf( "the file is sorted.\n" ); + else{ + printf( "sorting the file...\n" ); + while ( read_one_edge() != CUSTOM_EOF ){ + + //jump the ## + if (num_edges == 0) + continue; + //trace the vertex ids + if( src_vert < min_vertex_id ) min_vertex_id = src_vert; + if( dst_vert < min_vertex_id ) min_vertex_id = dst_vert; + + if( src_vert > max_vertex_id ) max_vertex_id = src_vert; + if( dst_vert > max_vertex_id ) max_vertex_id = dst_vert; + + (*(buf1 + current_buf_size)).src_vert = src_vert; + (*(buf1 + current_buf_size)).dest_vert = dst_vert; + current_buf_size++; + if (current_buf_size == each_buf_size) + { + //call function to sort and write back + std::cout << "copy " << current_buf_size << " edges to radix sort process." << std::endl; + wake_up_sort_src(file_id, current_buf_size, false); + current_buf_size = 0; + file_id++; + } + }//while EOF + + if (current_buf_size) + { + // printf("file_id is %d",file_id); + std::cout << "copy " << current_buf_size << " edges to radix sort process" << std::endl; + wake_up_sort_src(file_id, current_buf_size, true); + current_buf_size = 0; + } + + //init the global variable + line_no = 0; + num_edges = 0; + file_id = 0; + + //reinit the file pointer + char * tmp_out_dir; + tmp_out_dir = new char[strlen(out_dir)+1]; + strcpy(tmp_out_dir, out_dir); + std::string tmp_file (tmp_out_dir); + tmp_file += origin_edge_file; + tmp_file += "_sorted.txt"; + in = fopen( tmp_file.c_str(), "r" ); + if( in == NULL ){ + printf( "Cannot open the new sorted file!\n" ); + exit(1); + } + } + //parsing input file now. while ( read_one_edge() != CUSTOM_EOF ){ - //jump the ## - if (num_edges == 0) - continue; + //jump the ## + if (num_edges == 0) + continue; //trace the vertex ids + if( sort_needed == false){ if( src_vert < min_vertex_id ) min_vertex_id = src_vert; if( dst_vert < min_vertex_id ) min_vertex_id = dst_vert; if( src_vert > max_vertex_id ) max_vertex_id = src_vert; if( dst_vert > max_vertex_id ) max_vertex_id = dst_vert; - - //vertex id disorder. - if( src_vert < recent_src_vert ){ - if(num_edges > 1) - { - printf( "Edge order is not correct at line:%lld. Edge prcessing terminated.\n", line_no ); - fclose( in ); - exit(1); - } } //HANDLE THE EDGES //fill in the edge buffer, as well as the vertex id buffer edge_suffix = num_edges - (edge_buffer_offset * EDGE_BUFFER_LEN); - if (with_type1) - { - edge_buffer[edge_suffix].dest_vert = dst_vert; - edge_buffer[edge_suffix].edge_weight = produce_random_weight(); - //just for unit weight, needed by special sssp - //edge_buffer[edge_suffix].edge_weight = 1.0; - } - else - { - type2_edge_buffer[edge_suffix].dest_vert = dst_vert; - } - - - //add for in-edge if nessary - if (with_in_edge) - { - (*(buf1 + current_buf_size)).src_vert = src_vert; - (*(buf1 + current_buf_size)).dest_vert = dst_vert; - current_buf_size++; - if (current_buf_size == each_buf_size) - { - //call function to sort and write back - std::cout << "copy " << current_buf_size << " edges to radix sort process." << std::endl; - wake_up_sort(file_id, current_buf_size, false); - current_buf_size = 0; - file_id++; - } - } - - //write to out_txt file - //fprintf(out_txt, "%d\t%d\t%f\n", src_vert, dst_vert, edge_buffer[edge_suffix].edge_weight); - - //write to tmp-in-edge file - //insert_sort_for_buf(src_vert, dst_vert); + + if (with_type1) + { + edge_buffer[edge_suffix].dest_vert = dst_vert; + edge_buffer[edge_suffix].edge_weight = produce_random_weight(); + //just for unit weight, needed by special sssp + //edge_buffer[edge_suffix].edge_weight = 1.0; + } + else + { + type2_edge_buffer[edge_suffix].dest_vert = dst_vert; + } + + + //add for in-edge if nessary + if (with_in_edge) + { + (*(buf1 + current_buf_size)).src_vert = src_vert; + (*(buf1 + current_buf_size)).dest_vert = dst_vert; + current_buf_size++; + if (current_buf_size == each_buf_size) + { + //call function to sort and write back + std::cout << "copy " << current_buf_size << " edges to radix sort process." << std::endl; + wake_up_sort(file_id, current_buf_size, false); + current_buf_size = 0; + file_id++; + } + } + + //write to out_txt file + //fprintf(out_txt, "%d\t%d\t%f\n", src_vert, dst_vert, edge_buffer[edge_suffix].edge_weight); + + //write to tmp-in-edge file + //insert_sort_for_buf(src_vert, dst_vert); if (edge_suffix == (EDGE_BUFFER_LEN-1)){ - if (with_type1) - { - flush_buffer_to_file( edge_file, (char*)edge_buffer, - EDGE_BUFFER_LEN*sizeof(struct edge) ); - memset( (char*)edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct edge) ); - } - else - { - flush_buffer_to_file( edge_file, (char*)type2_edge_buffer, - EDGE_BUFFER_LEN*sizeof(struct type2_edge) ); - memset( (char*)type2_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct type2_edge) ); - } + if (with_type1) + { + flush_buffer_to_file( edge_file, (char*)edge_buffer, + EDGE_BUFFER_LEN*sizeof(struct edge) ); + memset( (char*)edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct edge) ); + } + else + { + flush_buffer_to_file( edge_file, (char*)type2_edge_buffer, + EDGE_BUFFER_LEN*sizeof(struct type2_edge) ); + memset( (char*)type2_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct type2_edge) ); + } edge_buffer_offset += 1; } //is source vertex id continuous? if (src_vert != recent_src_vert){ - if ( max_out_edges < (num_edges - prev_out) ) - max_out_edges = num_edges - prev_out; - prev_out = num_edges; + if ( max_out_edges < (num_edges - prev_out) ) + max_out_edges = num_edges - prev_out; + prev_out = num_edges; //add a new record in vertex id index if (src_vert >= (vert_buffer_offset+1) * VERT_BUFFER_LEN ){ vert_buffer_offset += 1; flush_buffer_to_file( vert_index_file, (char*)vert_buffer, - VERT_BUFFER_LEN*sizeof(struct vert_index) ); + VERT_BUFFER_LEN*sizeof(struct vert_index) ); memset( (char*)vert_buffer, 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); } vert_suffix = src_vert - vert_buffer_offset * VERT_BUFFER_LEN; @@ -198,26 +273,26 @@ void process_edgelist( const char* input_file_name, } }//while EOF - if (current_buf_size) - { - std::cout << "copy " << current_buf_size << " edges to radix sort process" << std::endl; - wake_up_sort(file_id, current_buf_size, true); - current_buf_size = 0; - } + if (current_buf_size) + { + std::cout << "copy " << current_buf_size << " edges to radix sort process" << std::endl; + wake_up_sort(file_id, current_buf_size, true); + current_buf_size = 0; + } //should flush the remaining data of both edge buffer and vertex index buffer to file! - if (with_type1) - flush_buffer_to_file( edge_file, (char*)edge_buffer, + if (with_type1) + flush_buffer_to_file( edge_file, (char*)edge_buffer, EDGE_BUFFER_LEN*sizeof(edge) ); - else - flush_buffer_to_file( edge_file, (char*)type2_edge_buffer, + else + flush_buffer_to_file( edge_file, (char*)type2_edge_buffer, EDGE_BUFFER_LEN*sizeof(type2_edge) ); flush_buffer_to_file( vert_index_file, (char*)vert_buffer, - VERT_BUFFER_LEN*sizeof(vert_index) ); + VERT_BUFFER_LEN*sizeof(vert_index) ); //finished processing fclose( in ); - //fclose(out_txt); + //fclose(out_txt); close( edge_file ); close( vert_index_file ); } @@ -231,15 +306,15 @@ void process_edgelist( const char* input_file_name, */ int flush_buffer_to_file( int fd, char* buffer, unsigned int size ) { - unsigned int n, offset, remaining, res; - n = offset = 0; - remaining = size; - while(n 0 ? size:4096, - PROT_READ|PROT_WRITE, - MAP_ANONYMOUS|MAP_SHARED, -1, 0); - printf( "Engine::map_anon_memory had allocated 0x%llx bytes at %llx\n", size, (u64_t)space); - if(space == MAP_FAILED) { - std::cerr << "mmap_anon_mem -- allocation " << "Error!\n"; - exit(-1); - } - if(mlocked) { - if(mlock(space, size) < 0) { - std::cerr << "mmap_anon_mem -- mlock " << "Error!\n"; - } - } - if(zero) { - memset(space, 0, size); - } - return space; + void *space = mmap(NULL, size > 0 ? size:4096, + PROT_READ|PROT_WRITE, + MAP_ANONYMOUS|MAP_SHARED, -1, 0); + printf( "Engine::map_anon_memory had allocated 0x%llx bytes at %llx\n", size, (u64_t)space); + if(space == MAP_FAILED) { + std::cerr << "mmap_anon_mem -- allocation " << "Error!\n"; + exit(-1); + } + if(mlocked) { + if(mlock(space, size) < 0) { + std::cerr << "mmap_anon_mem -- mlock " << "Error!\n"; + } + } + if(zero) { + memset(space, 0, size); + } + return space; } void do_io_work(const char *file_name_in, u32_t operation, char* buf, u64_t offset_in, u64_t size_in) { - int fd; - switch(operation) - { - case READ_FILE: - { - int read_finished = 0, remain = size_in, res; - fd = open(file_name_in, O_RDWR, S_IRUSR | S_IRGRP | S_IROTH); - if (fd < 0) - { - printf( "Cannot open attribute file for writing!\n"); - exit(-1); - } - if (lseek(fd, offset_in, SEEK_SET) < 0) - { - printf( "Cannot seek the attribute file!\n"); - exit(-1); - } - while (read_finished < (int)size_in) - { - if( (res = read(fd, buf, remain)) < 0 ) - { - printf( "Cannot seek the attribute file!\n"); - exit(-1); - } - read_finished += res; - remain -= res; - } - close(fd); - break; - } - case WRITE_FILE: - { - int written = 0, remain = size_in, res; - fd = open(file_name_in, O_RDWR, S_IRUSR | S_IRGRP | S_IROTH); - if (fd < 0) - { - printf( "Cannot open attribute file for writing!\n"); - exit(-1); - } - if (lseek(fd, offset_in, SEEK_SET) < 0) - { - printf( "Cannot seek the attribute file!\n"); - exit(-1); - } - while (written < (int)size_in) - { - if( (res = write(fd, buf, remain)) < 0 ) - { - printf( "Cannot seek the attribute file!\n"); - exit(-1); - } - written += res; - remain -= res; - } - close(fd); - break; - } - } + int fd; + switch(operation) + { + case READ_FILE: + { + int read_finished = 0, remain = size_in, res; + fd = open(file_name_in, O_RDWR, S_IRUSR | S_IRGRP | S_IROTH); + if (fd < 0) + { + printf( "Cannot open attribute file for writing!\n"); + exit(-1); + } + if (lseek(fd, offset_in, SEEK_SET) < 0) + { + printf( "Cannot seek the attribute file!\n"); + exit(-1); + } + while (read_finished < (int)size_in) + { + if( (res = read(fd, buf, remain)) < 0 ) + { + printf( "Cannot seek the attribute file!\n"); + exit(-1); + } + read_finished += res; + remain -= res; + } + close(fd); + break; + } + case WRITE_FILE: + { + int written = 0, remain = size_in, res; + fd = open(file_name_in, O_RDWR, S_IRUSR | S_IRGRP | S_IROTH); + if (fd < 0) + { + printf( "Cannot open attribute file for writing!\n"); + exit(-1); + } + if (lseek(fd, offset_in, SEEK_SET) < 0) + { + printf( "Cannot seek the attribute file!\n"); + exit(-1); + } + while (written < (int)size_in) + { + if( (res = write(fd, buf, remain)) < 0 ) + { + printf( "Cannot seek the attribute file!\n"); + exit(-1); + } + written += res; + remain -= res; + } + close(fd); + break; + } + } } -void process_in_edge(u64_t mem_size, - const char * edge_file_name, - const char * out_dir) +char* process_in_edge(u64_t mem_size, + const char * edge_file_name, + const char * out_dir) { - /*struct stat st; - u64_t edge_file_size; - //open the edge file - in_edge_fd = fopen(edge_file_name, "r"); - if (in_edge_fd < 0) - { - printf("Cannot open edge_file : %s\n", edge_file_name); - exit(-1); - } - //fstat(in_edge_fd, &st); - stat(edge_file_name, &st); - edge_file_size = (u64_t)st.st_size; - fclose(in_edge_fd); - printf( "edge file size:%lld(MBytes)\n", edge_file_size/(1024*1024) ); - printf( "edge file size:%lld\n", edge_file_size ); - exit(-1);*/ - tmp_out_dir = new char[strlen(out_dir)+1]; - strcpy(tmp_out_dir, out_dir); - - origin_edge_file = new char[strlen(edge_file_name)+1]; - strcpy(origin_edge_file, edge_file_name); - //determine how many files to sort - /*u64_t per_file_size; - if (mem_size >= (2*edge_file_size)) - { - num_parts = 1; - per_file_size = edge_file_size; - } - else - { - num_parts = ((edge_file_size)%(mem_size)) == 0 ? - (u32_t)(edge_file_size/mem_size) - :(u32_t)(edge_file_size/mem_size + 1); - per_file_size = mem_size/2; - }*/ - - num_parts = 0; - each_buf_len = mem_size/2; - each_buf_size = (u64_t)mem_size/(sizeof(struct tmp_in_edge)*2); - current_buf_size = 0; - current_file_id = 0; - //std::cout << "each_buf_len = " << total_buf_len << std::endl; - //std::cout << "each_buf_size = " << total_buf_size << std::endl; - //std::cout << "current_buf_size = " << current_buf_size << std::endl; - - /*for (u32_t i = 0; i < num_parts; i++) - { - if (i == num_parts - 1 && (edge_file_size%(mem_size) != 0)) - file_len[i] = edge_file_size%mem_size; - else - file_len[i] = per_file_size; - - std::cout << "Init for each file:" << file_len[i] << std::endl; - }*/ - - buf_for_sort = (char *)map_anon_memory(mem_size, true, true ); - edge_buf_for_sort = (struct tmp_in_edge *)buf_for_sort; - buf1 = (struct tmp_in_edge *)buf_for_sort; - buf2 = (struct tmp_in_edge *)(buf_for_sort + each_buf_len); - //printf("the address of edge_buf_for_sort is %llx\n", (u64_t)edge_buf_for_sort); - //printf("the address of buf1 is %llx\n", (u64_t)buf1); - //printf("the address of buf2 is %llx\n", (u64_t)buf2); + /*struct stat st; + u64_t edge_file_size; + //open the edge file + in_edge_fd = fopen(edge_file_name, "r"); + if (in_edge_fd < 0) + { + printf("Cannot open edge_file : %s\n", edge_file_name); + exit(-1); + } + //fstat(in_edge_fd, &st); + stat(edge_file_name, &st); + edge_file_size = (u64_t)st.st_size; + fclose(in_edge_fd); + printf( "edge file size:%lld(MBytes)\n", edge_file_size/(1024*1024) ); + printf( "edge file size:%lld\n", edge_file_size ); + exit(-1);*/ + tmp_out_dir = new char[strlen(out_dir)+1]; + strcpy(tmp_out_dir, out_dir); + + origin_edge_file = new char[strlen(edge_file_name)+1]; + strcpy(origin_edge_file, edge_file_name); + //determine how many files to sort + /*u64_t per_file_size; + if (mem_size >= (2*edge_file_size)) + { + num_parts = 1; + per_file_size = edge_file_size; + } + else + { + num_parts = ((edge_file_size)%(mem_size)) == 0 ? + (u32_t)(edge_file_size/mem_size) + :(u32_t)(edge_file_size/mem_size + 1); + per_file_size = mem_size/2; + }*/ + + num_parts = 0; + each_buf_len = mem_size/2; + each_buf_size = (u64_t)mem_size/(sizeof(struct tmp_in_edge)*2); + current_buf_size = 0; + current_file_id = 0; + //std::cout << "each_buf_len = " << total_buf_len << std::endl; + //std::cout << "each_buf_size = " << total_buf_size << std::endl; + //std::cout << "current_buf_size = " << current_buf_size << std::endl; + + /*for (u32_t i = 0; i < num_parts; i++) + { + if (i == num_parts - 1 && (edge_file_size%(mem_size) != 0)) + file_len[i] = edge_file_size%mem_size; + else + file_len[i] = per_file_size; + + std::cout << "Init for each file:" << file_len[i] << std::endl; + }*/ + + buf_for_sort = (char *)map_anon_memory(mem_size, true, true ); + edge_buf_for_sort = (struct tmp_in_edge *)buf_for_sort; + buf1 = (struct tmp_in_edge *)buf_for_sort; + buf2 = (struct tmp_in_edge *)(buf_for_sort + each_buf_len); + //printf("the address of edge_buf_for_sort is %llx\n", (u64_t)edge_buf_for_sort); + //printf("the address of buf1 is %llx\n", (u64_t)buf1); + //printf("the address of buf2 is %llx\n", (u64_t)buf2); + return buf_for_sort; } void wake_up_sort(u32_t file_id, u64_t buf_size, bool final_call) { - //std::cout << "in wakeup_sort, file_id = " << file_id; - //std::cout <<", buf_size = " << buf_size << std::endl; - - - - //start sort for this buffer - radix_sort(buf1, buf2, buf_size, max_vertex_id); - //exit(-1); - //for (u64_t i = 0; i < buf_size; i++) - //{ - // std::cout << "src_vert->dest_vert = " << (*(buf1+i)).src_vert << "->" << (*(buf1+i)).dest_vert <dst = " << src_vert << "->" << dest_vert << std::endl; - tmp_num_edges ++; - - //set the type2_edge_buffer - edge_suffix = tmp_num_edges - (edge_buffer_offset * EDGE_BUFFER_LEN); - in_edge_buffer[edge_suffix].in_vert = src_vert; - - //write back if necessary - if (edge_suffix == (EDGE_BUFFER_LEN - 1)) - { - flush_buffer_to_file( tmp_out_in_edge_file, (char*)in_edge_buffer, - EDGE_BUFFER_LEN*sizeof(struct in_edge) ); - memset( (char*)in_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct in_edge) ); - - edge_buffer_offset += 1; - } - - //is source vertex id continuous? - if (dest_vert != recent_src_vert) - { - if (dest_vert >= (vert_buffer_offset + 1)*VERT_BUFFER_LEN) - { - vert_buffer_offset += 1; - flush_buffer_to_file( tmp_out_in_index_file, (char*)in_vert_buffer, - VERT_BUFFER_LEN*sizeof(struct vert_index) ); - memset( (char*)in_vert_buffer , 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); - } - vert_suffix = dest_vert - vert_buffer_offset * VERT_BUFFER_LEN; - in_vert_buffer[vert_suffix].offset = tmp_num_edges; - - recent_src_vert = dest_vert; - } - } - - flush_buffer_to_file( tmp_out_in_edge_file, (char*)in_edge_buffer, - EDGE_BUFFER_LEN*sizeof(in_edge) ); - flush_buffer_to_file( tmp_out_in_index_file, (char*)in_vert_buffer, - VERT_BUFFER_LEN*sizeof(vert_index) ); - - close(tmp_out_in_edge_file); - close(tmp_out_in_index_file); - - /*std::string tmp_out_txt(tmp_out_dir); - tmp_out_txt += origin_edge_file; - tmp_out_txt += "-out.txt"; - FILE *tmp_out_file = fopen(tmp_out_txt.c_str(), "wt+"); - if (tmp_out_file == NULL) - { - assert(false); - } - - for (u64_t i = 0; i < buf_size; i++) - { - fprintf(tmp_out_file, "%d\t%d\t\n", (*(buf1+i)).src_vert, (*(buf1+i)).dest_vert); - } - fclose(tmp_out_file);*/ - - } - else - { - std::stringstream str_file_id; - str_file_id << file_id; - std::string tmp_in_file_name(tmp_out_dir) ; - tmp_in_file_name += origin_edge_file; - tmp_in_file_name += "-tmp_in_edge_file_" + str_file_id.str(); - //std::cout << "file " << tmp_in_file_name << std::endl; - int tmp_in_file = open( tmp_in_file_name.c_str(), O_CREAT|O_WRONLY, S_IRUSR ); - if( tmp_in_file == -1 ) - { - printf( "Cannot tmp_in_file: %s\nAborted..\n", tmp_in_file_name.c_str()); - exit( -1 ); - } - flush_buffer_to_file( tmp_in_file, (char*)buf1, (buf_size*sizeof(tmp_in_edge))); - //memset( (char*)buf1, 0, total_buf_len); - close(tmp_in_file); - - if (final_call) - { - num_tmp_files = file_id + 1; - file_len = new u64_t[file_id + 1]; - for (u32_t i = 0; i <= file_id; i++) - { - if (i == file_id) - file_len[i] = buf_size*sizeof(tmp_in_edge); - else - file_len[i] = each_buf_len; - } - - for (u32_t j = 0; j <= file_id; j++) - std::cout << "the size of tmp file[" <dest_vert = " << (*(buf1+i)).src_vert << "->" << (*(buf1+i)).dest_vert <dst = " << src_vert << "->" << dest_vert << std::endl; + tmp_num_edges ++; + + //set the type2_edge_buffer + edge_suffix = tmp_num_edges - (edge_buffer_offset * EDGE_BUFFER_LEN); + in_edge_buffer[edge_suffix].in_vert = src_vert; + + //write back if necessary + if (edge_suffix == (EDGE_BUFFER_LEN - 1)) + { + flush_buffer_to_file( tmp_out_in_edge_file, (char*)in_edge_buffer, + EDGE_BUFFER_LEN*sizeof(struct in_edge) ); + memset( (char*)in_edge_buffer, 0, EDGE_BUFFER_LEN*sizeof(struct in_edge) ); + + edge_buffer_offset += 1; + } + + //is source vertex id continuous? + if (dest_vert != recent_src_vert) + { + if (dest_vert >= (vert_buffer_offset + 1)*VERT_BUFFER_LEN) + { + vert_buffer_offset += 1; + flush_buffer_to_file( tmp_out_in_index_file, (char*)in_vert_buffer, + VERT_BUFFER_LEN*sizeof(struct vert_index) ); + memset( (char*)in_vert_buffer , 0, VERT_BUFFER_LEN*sizeof(struct vert_index) ); + } + vert_suffix = dest_vert - vert_buffer_offset * VERT_BUFFER_LEN; + in_vert_buffer[vert_suffix].offset = tmp_num_edges; + + recent_src_vert = dest_vert; + } + } + + flush_buffer_to_file( tmp_out_in_edge_file, (char*)in_edge_buffer, + EDGE_BUFFER_LEN*sizeof(in_edge) ); + flush_buffer_to_file( tmp_out_in_index_file, (char*)in_vert_buffer, + VERT_BUFFER_LEN*sizeof(vert_index) ); + + close(tmp_out_in_edge_file); + close(tmp_out_in_index_file); + + printf("no merge and mem_size is %lld\n", mem_size); + + /*std::string tmp_out_txt(tmp_out_dir); + tmp_out_txt += origin_edge_file; + tmp_out_txt += "-out.txt"; + FILE *tmp_out_file = fopen(tmp_out_txt.c_str(), "wt+"); + if (tmp_out_file == NULL) + { + assert(false); + } + + for (u64_t i = 0; i < buf_size; i++) + { + fprintf(tmp_out_file, "%d\t%d\t\n", (*(buf1+i)).src_vert, (*(buf1+i)).dest_vert); + } + fclose(tmp_out_file);*/ + + } + else + { + std::stringstream str_file_id; + str_file_id << file_id; + std::string tmp_in_file_name(tmp_out_dir) ; + tmp_in_file_name += origin_edge_file; + tmp_in_file_name += "-tmp_in_edge_file_" + str_file_id.str(); + //std::cout << "file " << tmp_in_file_name << std::endl; + int tmp_in_file = open( tmp_in_file_name.c_str(), O_CREAT|O_WRONLY, S_IRUSR ); + if( tmp_in_file == -1 ) + { + printf( "Cannot tmp_in_file: %s\nAborted..\n", tmp_in_file_name.c_str()); + exit( -1 ); + } + flush_buffer_to_file( tmp_in_file, (char*)buf1, (buf_size*sizeof(tmp_in_edge))); + //memset( (char*)buf1, 0, total_buf_len); + close(tmp_in_file); + + if (final_call) + { + num_tmp_files = file_id + 1; + file_len = new u64_t[file_id + 1]; + for (u32_t i = 0; i <= file_id; i++) + { + if (i == file_id) + file_len[i] = buf_size*sizeof(tmp_in_edge); + else + file_len[i] = each_buf_len; + } + + for (u32_t j = 0; j <= file_id; j++) + std::cout << "the size of tmp file[" <>bits_offset); - tmp[j] = radix_value; - u64_t k = radix_value; - counts[k]++; - } - u64_t s = 0; - for (u64_t i = 0; i < left_move; i++) - { - s += counts[i]; - counts[i] = s; - } + u64_t left_move = 1L << rbits; + u64_t radix_value; + for (u64_t i = 0; i < left_move; i++) + { + counts[i] = 0; + } + for (u64_t j = 0; j < num_edges; j++) + { + if(is_src != true) + radix_value = ((1 << rbits) - 1) & (((*(buf_1+j)).dest_vert)>>bits_offset); + else + radix_value = ((1 << rbits) - 1) & (((*(buf_1+j)).src_vert)>>bits_offset); + tmp[j] = radix_value; + u64_t k = radix_value; + counts[k]++; + } + u64_t s = 0; + for (u64_t i = 0; i < left_move; i++) + { + s += counts[i]; + counts[i] = s; + } - for (u64_t j = num_edges-1; (int)j >= (int)0; j--) - { - u64_t x = --counts[tmp[j]]; - (*(buf_2+x)).src_vert = (*(buf_1+j)).src_vert; - (*(buf_2+x)).dest_vert = (*(buf_1+j)).dest_vert; - } - //std::cout << "end radix_step" << std::endl; + for (u64_t j = num_edges-1; (int)j >= (int)0; j--) + { + u64_t x = --counts[tmp[j]]; + (*(buf_2+x)).src_vert = (*(buf_1+j)).src_vert; + (*(buf_2+x)).dest_vert = (*(buf_1+j)).dest_vert; + } + //std::cout << "end radix_step" << std::endl; } -template + template u64_t log2up(T i) { - u64_t a = 0; - while ((1L << a) <= i) a++; - return a; + u64_t a = 0; + while ((1L << a) <= i) a++; + return a; } void radix_sort(struct tmp_in_edge * buf_1, struct tmp_in_edge * buf_2, - u64_t num_edges, unsigned int max_vert_id) + u64_t num_edges, unsigned int max_vert_id, bool is_src) { - u64_t bits = log2up(max_vert_id); - u8_t *tmp = (u8_t *)malloc(sizeof(u8_t)*num_edges); - u64_t *counts = (u64_t *)malloc(sizeof(u64_t)*BUCKETS); + u64_t bits = log2up(max_vert_id); + u8_t *tmp = (u8_t *)malloc(sizeof(u8_t)*num_edges); + u64_t *counts = (u64_t *)malloc(sizeof(u64_t)*BUCKETS); - u64_t rounds = 1 + (bits-1)/MAX_RADIX; - u64_t rbits = 1 + (bits-1)/rounds; - u64_t bit_offset = 0; - bool flipped = 0; + u64_t rounds = 1 + (bits-1)/MAX_RADIX; + u64_t rbits = 1 + (bits-1)/rounds; + u64_t bit_offset = 0; + bool flipped = 0; - while (bit_offset < bits) - { - if (bit_offset + rbits > bits) rbits = bits - bit_offset; - //std::cout << "bit_offset = " << bit_offset << std::endl; - //std::cout << "bits = " << bits << std::endl; - //std::cout << "rbits = " << rbits << std::endl; - //std::cout << "rounds = " << rounds << std::endl; + while (bit_offset < bits) + { + if (bit_offset + rbits > bits) rbits = bits - bit_offset; + //std::cout << "bit_offset = " << bit_offset << std::endl; + //std::cout << "bits = " << bits << std::endl; + //std::cout << "rbits = " << rbits << std::endl; + //std::cout << "rounds = " << rounds << std::endl; - if (flipped) - radix_step(buf_2, buf_1, tmp, counts, num_edges, rbits, bit_offset); - else - radix_step(buf_1, buf_2, tmp, counts, num_edges, rbits, bit_offset); + if (flipped) + radix_step(buf_2, buf_1, tmp, counts, num_edges, rbits, bit_offset, is_src); + else + radix_step(buf_1, buf_2, tmp, counts, num_edges, rbits, bit_offset, is_src); - bit_offset += rbits; - flipped = !flipped; - } - //std::cout << "end here!" << std::endl; - if (flipped) - for (u64_t i = 0; i < num_edges; i++) - { - (*(buf_1+i)).src_vert = (*(buf_2+i)).src_vert; - (*(buf_1+i)).dest_vert = (*(buf_2+i)).dest_vert; - } + bit_offset += rbits; + flipped = !flipped; + } + //std::cout << "end here!" << std::endl; + if (flipped) + for (u64_t i = 0; i < num_edges; i++) + { + (*(buf_1+i)).src_vert = (*(buf_2+i)).src_vert; + (*(buf_1+i)).dest_vert = (*(buf_2+i)).dest_vert; + } - free(tmp); - free(counts); + free(tmp); + free(counts); } diff --git a/fogsrc/greedy_coloring.cpp b/fogsrc/greedy_coloring.cpp new file mode 100644 index 0000000..c7fd39a --- /dev/null +++ b/fogsrc/greedy_coloring.cpp @@ -0,0 +1,96 @@ +/************************************************************************************************** + * Authors: + * lilang + * + * Routines: + * greedy_coloring: + * 1. + * 2. + * 3. + * + * There is some problems for big graph. + *************************************************************************************************/ + +#ifndef __GREEDY_COLORING_H__ +#define __GREEDY_COLORING_H__ + +#include "../headers/types.hpp" +#include "../headers/fog_engine.hpp" +#include "../headers/index_vert_array.hpp" +#include "print_debug.hpp" +#include "limits.h" + + +template +class Greedy_coloring{ + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + //store all vertices' color + u32_t *color = new u32_t[gen_config.max_vert_id+1]; + u32_t *color_used = new u32_t[gen_config.max_vert_id+1];//store the label's times of presentation + u32_t max_color_num = 0; + + u32_t tmp_vert_id = gen_config.min_vert_id; + u32_t max_vert_id = gen_config.max_vert_id; + u32_t v_num_edges = 0; + T edge; + + //init the array + for(u32_t i=0;inum_edges(tmp_vert_id, OUT_EDGE); + if(v_num_edges == 0) + continue; + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + if(color[edge.dest_vert] != 0) + color_used[color[edge.dest_vert]]=1; + } + + for(unsigned int i=1;iget_out_edge(tmp_vert_id, i, edge); + color_used[color[edge.dest_vert]] = 0; + } + + tmp_vert_id++; + } + + + //statistics + std::cout << "the total number of colors is: " << max_color_num << std::endl; + + //print the result + for(u32_t i=gen_config.min_vert_id;i +class Hash_program{ + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + int partition_num = 0; + u64_t * in_deg_array = new u64_t[gen_config.num_processors]; + memset(in_deg_array, 0, gen_config.num_processors*sizeof(u64_t)); + u64_t * out_deg_array = new u64_t[gen_config.num_processors]; + memset(out_deg_array, 0, gen_config.num_processors*sizeof(u64_t)); + for (u32_t i = gen_config.min_vert_id; i <= gen_config.max_vert_id; i++) + { + partition_num = i % gen_config.num_processors; + in_deg_array[partition_num] += vert_index->num_edges(i, IN_EDGE); + out_deg_array[partition_num] += vert_index->num_edges(i, OUT_EDGE); + } + u64_t in_deg_sum = 0; + u64_t out_deg_sum = 0; + + for (u32_t j = 0; j < gen_config.num_processors; j++) + { + PRINT_DEBUG("Partition %d, in_deg = %lld, out_deg = %lld\n", j, in_deg_array[j], out_deg_array[j]); + in_deg_sum += in_deg_array[j]; + out_deg_sum += out_deg_array[j]; + } + assert(in_deg_sum==gen_config.num_edges); + assert(out_deg_sum==gen_config.num_edges); + //PRINT_DEBUG("in_deg_sum = %lld\n", in_deg_sum); + //PRINT_DEBUG("out_deg_sum = %lld\n", out_deg_sum); + + double in_deg_cv = cal_cv(in_deg_array, in_deg_array+gen_config.num_processors); + double out_deg_cv = cal_cv(out_deg_array, out_deg_array+gen_config.num_processors); + + //PRINT_DEBUG_LOG("In degree cv is: %f ; Out degree cv is: %f\n", in_deg_cv, out_deg_cv); + PRINT_DEBUG("In degree cv is: %.3f%% ; Out degree cv is: %.3f%%\n", in_deg_cv*100, out_deg_cv*100); + + + } +}; +double cal_cv(u64_t * start, u64_t * end) +{ + double average = 0.0; + double stan_dev = 0.0; + double cv = 0.0; + double temp = 0.0; + for (const u64_t * ptr = start; ptr != end; ptr++) + { + average += static_cast(*ptr); + } + + average /= gen_config.num_processors; + for (const u64_t * ptr = start; ptr != end; ptr++) + { + temp = average - static_cast(*ptr); + stan_dev += pow(temp, 2); + } + stan_dev /= gen_config.num_processors; + stan_dev = sqrt(stan_dev); + cv = stan_dev/average; + return cv; +} + +#endif diff --git a/fogsrc/lpa.cpp b/fogsrc/lpa.cpp new file mode 100644 index 0000000..e7b26d8 --- /dev/null +++ b/fogsrc/lpa.cpp @@ -0,0 +1,187 @@ +/************************************************************************************************** + * Authors: + * lilang + * + * Routines: + * LPA: 1. give every vertex a unique label + * 2. update a vertex's label using the label presents most among its neibors + * if there are more than one and its label is in the set, don't change, and if not, choose randomly + * 3. repeat 2, until all vertices' labels don't change. + * + * this is sync. + * It is very slow to processing big graph. + *************************************************************************************************/ + +#ifndef __LPA_PROGRAM_H__ +#define __LPA_PROGRAM_H__ + +#include "../headers/types.hpp" +#include "../headers/fog_engine.hpp" +#include "../headers/index_vert_array.hpp" +#include "print_debug.hpp" +#include "limits.h" + +//#define + +template +class LPA_program{ + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + bool convergence = false; + //store the old label, and the new label + u32_t **label = new u32_t*[3];//assume the graph is not sparse, assume the graph just has two-state oscillation + for(int i=0;i<3;i++) + label[i] = new u32_t[gen_config.max_vert_id+1]; + u32_t *label_times = new u32_t[gen_config.max_vert_id+1];//store the label's times of presentation + u32_t *label_of_same_times = new u32_t[gen_config.max_vert_id+1];//store the labels present most + u32_t reverse = 0;//which row stores the old label + u32_t niters = 0; + + std::cout << "123" << std::endl; + //init vertex's label(equal to its id) + for(u32_t i=0;i= 3){ + bool oscillation = true; + for(u32_t i=gen_config.min_vert_id;inum_edges(tmp_vert_id, OUT_EDGE); + if(v_num_edges == 0) + continue; + u32_t times = 0; + u32_t tmp_times; + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + label_times[label[reverse][edge.dest_vert]]++; + } + + //find the label presents most + u32_t key = 0; + u32_t neibor_label; + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + neibor_label = label[reverse][edge.dest_vert]; + tmp_times = label_times[neibor_label]; + if(times < tmp_times){ + times = tmp_times; + key = 0; + label_of_same_times[key] = neibor_label; + }else if(times == tmp_times){ + bool go_to_next = false; + for(unsigned int j=0;j<=key;j++) + if(neibor_label == label_of_same_times[key]) + go_to_next = true; + if(go_to_next == false){ + key++; + label_of_same_times[key] = neibor_label; + } + } + //if the times is equal, randomly choose + } + + //take its label into consideration + bool update = true; + for(unsigned int k=0;k<=key;k++) + if(label[reverse][tmp_vert_id] == label_of_same_times[k]){ + label_most = label[reverse][tmp_vert_id]; + update = false; + break; + } + if(update == true){ + srand( (unsigned)time(NULL) - niters); + u32_t random = rand()%(key+1); + label_most = label_of_same_times[random]; + } + + //init + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + label_times[label[reverse][edge.dest_vert]] = 0; + } + + if(label[reverse][tmp_vert_id] != label_most){ + label[(reverse+1)%3][tmp_vert_id] = label_most; + convergence = false; + }else{ + label[(reverse+1)%3][tmp_vert_id] = label[reverse][tmp_vert_id]; + } + + tmp_vert_id++; + } + + reverse = (reverse+1)%3; + } + + //statistics + u32_t label_s[gen_config.max_vert_id+1]; + int count = 0; + memset(label_s, -1, (gen_config.max_vert_id+1) * sizeof(int)) ; + for(u32_t i=gen_config.min_vert_id;i +class LPA_async_program{ + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + bool convergence = false; + //store the old label, and the new label + u32_t *label = new u32_t[gen_config.max_vert_id+1]; + u32_t *label_times = new u32_t[gen_config.max_vert_id+1];//store the label's times of presentation + u32_t *label_of_same_times = new u32_t[gen_config.max_vert_id+1];//store the labels present most + u32_t niters = 0; + + //init vertex's label(equal to its id) + for(u32_t i=0;inum_edges(tmp_vert_id, OUT_EDGE); + if(v_num_edges == 0) + continue; + u32_t times = 0; + u32_t tmp_times; + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + label_times[label[edge.dest_vert]]++; + } + + //find the label presents most + u32_t key = 0; + u32_t neibor_label; + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + neibor_label = label[edge.dest_vert]; + tmp_times = label_times[neibor_label]; + if(times < tmp_times){ + times = tmp_times; + key = 0; + label_of_same_times[key] = neibor_label; + }else if(times == tmp_times){ + bool go_to_next = false; + for(unsigned int j=0;j<=key;j++) + if(neibor_label == label_of_same_times[key]) + go_to_next = true; + if(go_to_next == false){ + key++; + label_of_same_times[key] = neibor_label; + } + } + //if the times is equal, randomly choose + } + + //take its label into consideration + bool update = true; + for(unsigned int k=0;k<=key;k++) + if(label[tmp_vert_id] == label_of_same_times[k]){ + label_most = label[tmp_vert_id]; + update = false; + break; + } + if(update == true){ + srand( (unsigned)time(NULL) - niters); + u32_t random = rand()%(key+1); + label_most = label_of_same_times[random]; + } + + //init + for(unsigned int i=0;iget_out_edge(tmp_vert_id, i, edge); + label_times[label[edge.dest_vert]] = 0; + } + + if(label[tmp_vert_id] != label_most){ + label[tmp_vert_id] = label_most; + convergence = false; + }else{ + label[tmp_vert_id] = label[tmp_vert_id]; + } + + tmp_vert_id++; + } + } + + //statistics + u32_t label_s[gen_config.max_vert_id+1]; + int count = 0; + memset(label_s, -1, (gen_config.max_vert_id+1) * sizeof(int)); + for(u32_t i=gen_config.min_vert_id;i, cc_vert_attr, cc_vert_attr, T> *eng; (*(eng = new fog_engine, cc_vert_attr, cc_vert_attr, T>(TARGET_ENGINE)))(); delete eng; + }else if (prog_name == "lpa"){ + PRINT_DEBUG("lpa starts!\n"); + int check = access(gen_config.in_edge_file_name.c_str(), F_OK); + if(-1 ==check ) + { + PRINT_ERROR("in_edge file doesn't exit or '-i' is false!\n"); + } + + LPA_program * lpa = new LPA_program; + lpa->run(); + }else if (prog_name == "lpa_async"){ + PRINT_DEBUG("lpa_async starts!\n"); + int check = access(gen_config.in_edge_file_name.c_str(), F_OK); + if(-1 ==check ) + { + PRINT_ERROR("in_edge file doesn't exit or '-i' is false!\n"); + } + + LPA_async_program * lpa_async = new LPA_async_program; + lpa_async->run(); + }else if (prog_name == "greedy_coloring"){ + PRINT_DEBUG("greedy coloring starts!\n"); + int check = access(gen_config.in_edge_file_name.c_str(), F_OK); + if(-1 ==check ) + { + PRINT_ERROR("in_edge file doesn't exit or '-i' is false!\n"); + } + + Greedy_coloring * g_c = new Greedy_coloring; + g_c->run(); + }else if (prog_name == "pagerank_s"){ + PRINT_DEBUG("pagerank_simple starts!\n"); + Pagerank_simple::iteration_times = vm["pagerank_s::niters"].as(); + unsigned int iteration_times = Pagerank_simple::iteration_times; + if(10 == iteration_times) + { + std::cout<<"You didn't input the pagerank_s::niters or you chose the default value:10, the algorithm will run 10 iterations."< * p_s = new Pagerank_simple; + p_s->run(); + }else if (prog_name == "pagerank_m"){ + PRINT_DEBUG("pagerank_matrix starts!\n"); + Pagerank_matrix::iteration_times = vm["pagerank_m::niters"].as(); + unsigned int iteration_times = Pagerank_matrix::iteration_times; + if(10 == iteration_times) + { + std::cout<<"You didn't input the pagerank_m::niters or you chose the default value:10, the algorithm will run 10 iterations."< * p_m = new Pagerank_matrix; + p_m->run(); }else if (prog_name == "demo"){ PRINT_DEBUG("demo starts!\n"); diff --git a/fogsrc/pagerank_matrix.cpp b/fogsrc/pagerank_matrix.cpp new file mode 100644 index 0000000..1250892 --- /dev/null +++ b/fogsrc/pagerank_matrix.cpp @@ -0,0 +1,102 @@ +/************************************************************************************************** + * Authors: + * lilang + * + * Routines: + * pagerank_matrix: + * 1.use matrix to calculate the pagerank, don't need iteration + * 2.http://segmentfault.com/a/1190000000711128#articleHeader0 + * 3.may not be useful for the transition probability that is nearly to 1 or randomly jumping probability that is nearly to 0 + * 4.cannot process big graph like soc-Livejournal now(2015.10.23)( error: bad_alloc ) + *************************************************************************************************/ + +#ifndef __PAGERANK_MATRIX_H__ +#define __PAGERANK_MATRIX_H__ + +#include "../headers/types.hpp" +#include "../headers/fog_engine.hpp" +#include "../headers/index_vert_array.hpp" +#include "print_debug.hpp" +#include "limits.h" + +#include + +using namespace Eigen; +using namespace std; + +template +class Pagerank_matrix{ + public: + static u32_t iteration_times; + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + u32_t min_vert_id = gen_config.min_vert_id; + u32_t max_vert_id = gen_config.max_vert_id; + u32_t total = max_vert_id - min_vert_id + 1; + u32_t v_num_edges = 0; + struct in_edge in_edge; + + float p = 0.85; + float delta = (1-p)/total; + + std::cout << "init matrix" << std::endl; + MatrixXf m_g(total,total);//the link matrix + MatrixXf m_d(total,total);//the reciprocal of Cj's out-deg, it is a diagonal matrix + MatrixXf m_i(total,total);//it is a identity matrix + MatrixXf m_temp(total,total); + MatrixXf m_temp_inv(total,total); + VectorXf v(total); + float v_sum = 0; + + m_g = MatrixXf::Zero(total,total); + m_d = MatrixXf::Zero(total,total); + m_i.setIdentity(total,total); + m_temp = MatrixXf::Zero(total,total); + //v = VectorXf::Ones(); + for(u32_t i=0;inum_edges(i+min_vert_id, IN_EDGE); + for(u32_t j=0;jget_in_edge(i+min_vert_id, j, in_edge); + m_g(i,in_edge.src_vert - min_vert_id) = 1; + } + + v_num_edges = vert_index->num_edges(i+min_vert_id, OUT_EDGE); + if(v_num_edges != 0) + m_d(i,i) = 1.0/v_num_edges; + else + std::cout << "the " << i+min_vert_id << " vertex's out-degree is zero"; + } + + std::cout << "m_i\n" << m_i << std::endl; + std::cout << "m_g\n" << m_g << std::endl; + std::cout << "m_d\n" << m_d << std::endl; + + m_temp = m_i - p * m_g * m_d; + m_temp_inv /= delta; + m_temp_inv = m_temp.inverse(); + v = m_temp_inv * v; + //normalizing, ensure the E * v = 1 + for(u32_t i=0;i +class Pagerank_simple{ + public: + static u32_t iteration_times; + public: + void run() + { + index_vert_array * vert_index = new index_vert_array; + + u32_t min_vert_id = gen_config.min_vert_id; + u32_t max_vert_id = gen_config.max_vert_id; + u32_t total = max_vert_id - min_vert_id + 1; + + float p = 0.85; + float delta = (1-p)/total; + + //every page's pagerank value, old and new + float **pr = new float*[2]; + pr[0] = new float[total]; + pr[1] = new float[total]; + for(u32_t i=0;inum_edges(tmp_vert_id, IN_EDGE); + //in_deg[tmp_vert_id - min_vert_id] = v_num_edges; + v_num_edges = vert_index->num_edges(tmp_vert_id, OUT_EDGE); + out_deg[tmp_vert_id - min_vert_id] = v_num_edges; + + tmp_vert_id++; + } + + //the core of pagerank algorithm + //this is a push model + u32_t iters = 0; + while(iters < iteration_times){ + tmp_vert_id = min_vert_id; + while(tmp_vert_id <= max_vert_id){ + if(out_deg[tmp_vert_id-min_vert_id] == 0)//if the page's out-degree is zero, every page shares it + for(u32_t i=0;iget_out_edge(tmp_vert_id, i, out_edge); + pr[reverse][out_edge.dest_vert - min_vert_id] += pr[1-reverse][tmp_vert_id - min_vert_id] / out_deg[tmp_vert_id-min_vert_id]; + } + + tmp_vert_id++; + } + //take transition probability into consideration + for(u32_t i=0;i