Apache Hama安装部署(3)

if (args.length > 0) {
            bsp.setNumBspTask(Integer.parseInt(args[0]));
        } else {
            // Set to maximum
            bsp.setNumBspTask(cluster.getMaxTasks());
        }

long startTime = System.currentTimeMillis();

if (bsp.waitForCompletion(true)) {
            printOutput(conf);
            System.out.println("Job Finished in "
                    + (System.currentTimeMillis() - startTime) / 1000.0
                    + " seconds");
        }
    }

}

将工程Export成Jar文件,发到集群上运行。运行命令:

$HAMA_HOME/bin/hama  jar  jarName.jar

输出:

Apache Hama安装部署

Current supersteps number: 0()

Current supersteps number: 4()

The total number of supersteps: 4(总超级步数目)

Counters: 8(一共8个计数器,如下8个。所有计数器列表待完善)

org.apache.hama.bsp.JobInProgress$JobCounter

SUPERSTEPS=4(BSPMaster超级步数目)

LAUNCHED_TASKS=3(共多少个task)

org.apache.hama.bsp.BSPPeerImpl$PeerCounter

SUPERSTEP_SUM=12(总共的超级步数目,task数目*BSPMaster超级步数目)

MESSAGE_BYTES_TRANSFERED=48(传输信息字节数)

TIME_IN_SYNC_MS=657(同步消耗时间)

TOTAL_MESSAGES_SENT=6(发送信息条数)

TOTAL_MESSAGES_RECEIVED=6(接收信息条数)

TASK_OUTPUT_RECORDS=2(任务输出记录数)

PageRank例子:

package pi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;

/**
 * Real pagerank with dangling node contribution.
 */
public class PageRank {

public static class PageRankVertex extends
            Vertex<Text, NullWritable, DoubleWritable> {

static double DAMPING_FACTOR = 0.85;
        static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
       
        @Override
        public void setup(HamaConfiguration conf) {
            String val = conf.get("hama.pagerank.alpha");
            if (val != null) {
                DAMPING_FACTOR = Double.parseDouble(val);
            }
            val = conf.get("hama.graph.max.convergence.error");
            if (val != null) {
                MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
            }
        }

@Override
        public void compute(Iterable<DoubleWritable> messages)
                throws IOException {
            // initialize this vertex to 1 / count of global vertices in this
            // graph
            if (this.getSuperstepCount() == 0) {
                this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
            } else if (this.getSuperstepCount() >= 1) {
                double sum = 0;
                for (DoubleWritable msg : messages) {
                    sum += msg.get();
                }
                double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
                this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
            }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b517929e7d0cb94348a8b038071fbf65.html