if (args.length > 0) {
bsp.setNumBspTask(Integer.parseInt(args[0]));
} else {
// Set to maximum
bsp.setNumBspTask(cluster.getMaxTasks());
}
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
printOutput(conf);
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
}
}
将工程Export成Jar文件,发到集群上运行。运行命令:
$HAMA_HOME/bin/hama jar jarName.jar
输出:
Current supersteps number: 0()
Current supersteps number: 4()
The total number of supersteps: 4(总超级步数目)
Counters: 8(一共8个计数器,如下8个。所有计数器列表待完善)
org.apache.hama.bsp.JobInProgress$JobCounter
SUPERSTEPS=4(BSPMaster超级步数目)
LAUNCHED_TASKS=3(共多少个task)
org.apache.hama.bsp.BSPPeerImpl$PeerCounter
SUPERSTEP_SUM=12(总共的超级步数目,task数目*BSPMaster超级步数目)
MESSAGE_BYTES_TRANSFERED=48(传输信息字节数)
TIME_IN_SYNC_MS=657(同步消耗时间)
TOTAL_MESSAGES_SENT=6(发送信息条数)
TOTAL_MESSAGES_RECEIVED=6(接收信息条数)
TASK_OUTPUT_RECORDS=2(任务输出记录数)
PageRank例子:
package pi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
/**
* Real pagerank with dangling node contribution.
*/
public class PageRank {
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
static double DAMPING_FACTOR = 0.85;
static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
@Override
public void setup(HamaConfiguration conf) {
String val = conf.get("hama.pagerank.alpha");
if (val != null) {
DAMPING_FACTOR = Double.parseDouble(val);
}
val = conf.get("hama.graph.max.convergence.error");
if (val != null) {
MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
}
}
@Override
public void compute(Iterable<DoubleWritable> messages)
throws IOException {
// initialize this vertex to 1 / count of global vertices in this
// graph
if (this.getSuperstepCount() == 0) {
this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
} else if (this.getSuperstepCount() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
}