Skip to content

Commit

Permalink
Merge pull request #41 from ylow/master
Browse files Browse the repository at this point in the history
Improved mpirsync. Engine now permits arbitrary signals in context
  • Loading branch information
Yucheng Low committed Sep 19, 2013
2 parents fcb6a4b + 69f5726 commit dfdffb6
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 31 deletions.
2 changes: 1 addition & 1 deletion scripts/mpirsync
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
src_path=$(hostname):$PWD
dest_path=$PWD
mpiexec.openmpi -hostfile ~/machines -nolocal -pernode mkdir -p $dest_path
mpiexec.openmpi -hostfile ~/machines -nolocal -pernode rsync -avz --exclude '*.make' --exclude '*.cmake' $src_path/ $dest_path
mpiexec.openmpi -hostfile ~/machines -nolocal -pernode rsync -e 'ssh -o StrictHostKeyChecking=no' -avz --exclude '*.make' --exclude '*.cmake' $src_path/ $dest_path
15 changes: 4 additions & 11 deletions src/graphlab/engine/async_consistent_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,21 +653,14 @@ namespace graphlab {
if (force_stop) return;
if (graph.is_master(gvid)) {
internal_signal(graph.vertex(gvid), message);
} else {
procid_t proc = graph.master(gvid);
rmi.remote_call(proc, &async_consistent_engine::internal_signal_gvid,
gvid, message);
}
}



void internal_signal_broadcast(vertex_id_type gvid,
const message_type& message = message_type()) {
for (size_t i = 0;i < rmi.numprocs(); ++i) {
rmi.remote_call(i, &async_consistent_engine::internal_signal_gvid,
gvid, message);
}
} // end of signal_broadcast



void rpc_internal_stop() {
force_stop = true;
termination_reason = execution_status::FORCED_ABORT;
Expand Down
19 changes: 9 additions & 10 deletions src/graphlab/engine/synchronous_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,13 +728,12 @@ namespace graphlab {

/**
* \brief Called by the context to signal an arbitrary vertex.
* This must be done by finding the owner of that vertex.
*
* @param [in] gvid the global vertex id of the vertex to signal
* @param [in] message the message to send to that vertex.
*/
void internal_signal_broadcast(vertex_id_type gvid,
const message_type& message = message_type());
void internal_signal_gvid(vertex_id_type gvid,
const message_type& message = message_type());

/**
* \brief This function tests if this machine is the master of
Expand Down Expand Up @@ -1190,13 +1189,13 @@ namespace graphlab {

template<typename VertexProgram>
void synchronous_engine<VertexProgram>::
internal_signal_broadcast(vertex_id_type gvid, const message_type& message) {
for (size_t i = 0; i < rmi.numprocs(); ++i) {
if(i == rmi.procid()) internal_signal_rpc(gvid, message);
else rmi.remote_call(i, &synchronous_engine<VertexProgram>::internal_signal_rpc,
gvid, message);
}
} // end of internal_signal_broadcast
internal_signal_gvid(vertex_id_type gvid, const message_type& message) {
procid_t proc = graph.master(gvid);
if(proc == rmi.procid()) internal_signal_rpc(gvid, message);
else rmi.remote_call(proc,
&synchronous_engine<VertexProgram>::internal_signal_rpc,
gvid, message);
}

template<typename VertexProgram>
void synchronous_engine<VertexProgram>::
Expand Down
31 changes: 30 additions & 1 deletion src/graphlab/engine/warp_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ namespace warp {
* and is used to encode iterative computation. Typically a vertex
* program will signal neighboring vertices during the scatter
* phase. A vertex program may choose to signal neighbors on when
* changes made during the previos phases break invariants or warrant
* changes made during the previous phases break invariants or warrant
* future computation on neighboring vertices.
*
* The signal function takes two arguments. The first is mandatory
Expand All @@ -390,6 +390,31 @@ namespace warp {
}



/**
* \brief Signal an arbitrary vertex ID with a particular message.
*
* This function is an essential part of the GraphLab abstraction
* and is used to encode iterative computation. Typically a vertex
* program will signal neighboring vertices during the scatter
* phase. A vertex program may choose to signal neighbors on when
* changes made during the previous phases break invariants or warrant
* future computation on neighboring vertices.
*
* The signal function takes two arguments. The first is mandatory
* and specifies which vertex to signal. The second argument is
* optional and is used to send a message. If no message is
* provided then the default message is used.
*
* \param vertex [in] The vertex to send the message to
* \param message [in] The message to send, defaults to message_type().
*/
void signal(vertex_id_type gvid,
const message_type& message = message_type()) {
engine.internal_signal_gvid(gvid, message);
}


/**
* \internal
* \brief Flags that this vertex was synchronized.
Expand Down Expand Up @@ -792,6 +817,10 @@ namespace warp {
if (force_stop) return;
if (graph.is_master(gvid)) {
internal_signal(graph.vertex(gvid), message);
} else {
procid_t proc = graph.master(gvid);
rmi.remote_call(proc, &warp_engine::internal_signal,
gvid, message);
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/graphlab/graph/distributed_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#include <graphlab/graph/ingress/sharding_constraint.hpp>
#include <graphlab/graph/ingress/distributed_constrained_random_ingress.hpp>

#include <graphlab/graph/graph_hash.hpp>

#include <graphlab/util/hopscotch_map.hpp>

Expand Down Expand Up @@ -2783,9 +2784,16 @@ namespace graphlab {
* master vertex on this machine and false otherwise.
*/
bool is_master(vertex_id_type vid) const {
typename hopscotch_map_type::const_iterator iter = vid2lvid.find(vid);
return (iter != vid2lvid.end()) && l_is_master(iter->second);
const procid_t owning_proc = graph_hash::hash_vertex(vid) % rpc.numprocs();
return (owning_proc == rpc.procid());
}


procid_t master(vertex_id_type vid) const {
const procid_t owning_proc = graph_hash::hash_vertex(vid) % rpc.numprocs();
return owning_proc;
}

/** \internal
* \brief Returns true if the provided local vertex ID is a master vertex.
* Returns false otherwise.
Expand Down
10 changes: 4 additions & 6 deletions src/graphlab/vertex_program/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,13 @@ namespace graphlab {
}

/**
* Send a message to a vertex ID.
* \warning This function will be slow since the current machine do
* not know the location of the vertex ID.
* \warning This may be unreliable. signals issued near to engine
* termination may be lost.
* Send a message to an arbitrary vertex ID.
* \warning If sending to neighboring vertices, the \ref signal()
* function is more efficientas it permits sender side message combining.
*/
void signal_vid(vertex_id_type vid,
const message_type& message = message_type()) {
engine.internal_signal_broadcast(vid, message);
engine.internal_signal_gvid(vid, message);
}


Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ add_graphlab_executable(mini_web_server mini_web_server.cpp)
add_graphlab_executable(test_vertex_set test_vertex_set.cpp)

add_test(test_vertex_set test_vertex_set)
add_graphlab_executable(arbitrary_signal_test arbitrary_signal_test.cpp)


add_graphlab_executable(sort_test sort_test.cpp)
Expand Down
85 changes: 85 additions & 0 deletions tests/arbitrary_signal_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright (c) 2009 Carnegie Mellon University.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*
* For more about this software visit:
*
* http://www.graphlab.ml.cmu.edu
*
*/

#include <vector>
#include <algorithm>
#include <iostream>


// #include <cxxtest/TestSuite.h>

#include <graphlab.hpp>

typedef graphlab::distributed_graph<int,int> graph_type;


class test_uf:
public graphlab::ivertex_program<graph_type, int>,
public graphlab::IS_POD_TYPE {
public:
edge_dir_type
gather_edges(icontext_type& context,
const vertex_type& vertex) const {
return graphlab::NO_EDGES;
}
void apply(icontext_type& context, vertex_type& vertex,
const gather_type& total) {
if (vertex.id() < 99) context.signal_vid(vertex.id() + 1);
}
edge_dir_type scatter_edges(icontext_type& context,
const vertex_type& vertex) const {
return graphlab::NO_EDGES;
}
}; // end of count neighbors


typedef graphlab::async_consistent_engine<test_uf> agg_engine_type;
//typedef graphlab::synchronous_engine<test_uf> agg_engine_type;

int main(int argc, char** argv) {
global_logger().set_log_level(LOG_WARNING);
///! Initialize control plain using mpi
graphlab::mpi_tools::init(argc, argv);
graphlab::dc_init_param rpc_parameters;
graphlab::init_param_from_mpi(rpc_parameters);
graphlab::distributed_control dc(rpc_parameters);

graphlab::command_line_options clopts("Test code.");
clopts.set_scheduler_type("queued_fifo");
std::cout << "Creating a powerlaw graph" << std::endl;
graph_type graph(dc, clopts);
graph.load_synthetic_powerlaw(100);


typedef agg_engine_type engine_type;
engine_type engine(dc, graph, clopts);
engine.signal(0);
engine.start();

ASSERT_EQ(engine.num_updates(), 100);
graphlab::mpi_tools::finalize();
} // end of main





0 comments on commit dfdffb6

Please sign in to comment.