Apache Hama安装部署(4)

// if we have not reached our global error yet, then proceed.
            DoubleWritable globalError = this.getAggregatedValue(0);
            if (globalError != null && this.getSuperstepCount() > 2
                    && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
                voteToHalt();
                return;
            }

// in each superstep we are going to send a new rank to our
            // neighbours
            sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
                    / this.getEdges().size()));
        }
    }

public static GraphJob createJob(String[] args, HamaConfiguration conf)
            throws IOException {
        GraphJob pageJob = new GraphJob(conf, PageRank.class);
        pageJob.setJobName("Pagerank");

pageJob.setVertexClass(PageRankVertex.class);
        pageJob.setInputPath(new Path(args[0]));
        pageJob.setOutputPath(new Path(args[1]));

// set the defaults
        pageJob.setMaxIteration(30);
        pageJob.set("hama.pagerank.alpha", "0.85");
        // reference vertices to itself, because we don't have a dangling node
        // contribution here
        pageJob.set("hama.graph.self.ref", "true");
        pageJob.set("hama.graph.max.convergence.error", "1");

if (args.length == 3) {
            pageJob.setNumBspTask(Integer.parseInt(args[2]));
        }

// error
        pageJob.setAggregatorClass(AverageAggregator.class);

// Vertex reader
        pageJob.setVertexInputReaderClass(PagerankTextReader.class);

pageJob.setVertexIDClass(Text.class);
        pageJob.setVertexValueClass(DoubleWritable.class);
        pageJob.setEdgeValueClass(NullWritable.class);

pageJob.setPartitioner(HashPartitioner.class);
        pageJob.setOutputFormat(TextOutputFormat.class);
        pageJob.setOutputKeyClass(Text.class);
        pageJob.setOutputValueClass(DoubleWritable.class);
        return pageJob;
    }

private static void printUsage() {
        System.out.println("Usage: <input> <output> [tasks]");
        System.exit(-1);
    }

public static class PagerankTextReader
            extends
            VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {

@Override
        public boolean parseVertex(LongWritable key, Text value,
                Vertex<Text, NullWritable, DoubleWritable> vertex)
                throws Exception {
            String[] split = value.toString().split("\t");
            for (int i = 0; i < split.length; i++) {
                if (i == 0) {
                    vertex.setVertexID(new Text(split[i]));
                } else {
                    vertex.addEdge(new Edge<Text, NullWritable>(new Text(
                            split[i]), null));
                }
            }
            return true;
        }

}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b517929e7d0cb94348a8b038071fbf65.html