Apache Hama安装部署

安装Hama之前,应该首先确保系统中已经安装了Hadoop,本集群使用的版本为hadoop-2.3.0 一、下载及解压Hama文件

下载地址:,选用的是目前最新版本:hama0.6.4。解压之后的存放位置自己设定。

二、修改配置文件

在hama-env.sh文件中加入JAVA_HOME变量(分布式情况下,设为机器的值)

配置hama-site.xml(分布式情况下,所有机器的配置相同)

bsp.master.address为bsp master地址。fs.default.name参数设置成hadoop里namenode的地址。hama.zookeeper.quorum和      hama.zookeeper.property.clientPort两个参数和zookeeper有关,设置成为zookeeper的quorum server即可,单机伪分布式就是本机地址。

Apache Hama安装部署

4. 配置groomservers文件。hama与hadoop具有相似的主从结构,该文件存放从节点的IP地址,每个IP占一行。(分布式情况下只需要配置BSPMaster所在的机器即可)

5. hama0.6.4自带的hadoop核心包为1.2.0,与集群hadoop2.3.0不一致,需要进行替换,具体是在hadoop的lib文件夹下找到hadoop-core-2.3.0*.jar和hadoop-test-2.3.0*.jar,拷贝到hama的lib目录下,并删除hadoop-core-1.2.0.jar和hadoop-test-1.2.0.jar两个文件。

  6. 此时可能会报找不到类的错, 需加入缺失的jar包。(把hadoop开头的jar包和protobuf-java-2.5.0.jar导入到hama/lib下)

三、编写Hama job

在eclipse下新建Java Project,将hama安装时需要的jar包全部导入工程。

官网中计算PI的例子:

package pi;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;

public class PiEstimator {
    private static Path TMP_OUTPUT = new Path("/tmp/pi-"
            + System.currentTimeMillis());

public static class MyEstimator
            extends
            BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
        public static final Log LOG = LogFactory.getLog(MyEstimator.class);
        private String masterTask;
        private static final int iterations = 100000;

@Override
        public void bsp(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException, SyncException, InterruptedException {

int in = 0;
            for (int i = 0; i < iterations; i++) {
                double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
                if ((Math.sqrt(x * x + y * y) < 1.0)) {
                    in++;
                }
            }

double data = 4.0 * in / iterations;

peer.send(masterTask, new DoubleWritable(data));
            peer.sync();

if (peer.getPeerName().equals(masterTask)) {
                double pi = 0.0;
                int numPeers = peer.getNumCurrentMessages();
                DoubleWritable received;
                while ((received = peer.getCurrentMessage()) != null) {
                    pi += received.get();
                }

pi = pi / numPeers;
                peer.write(new Text("Estimated value1 of PI is"),
                        new DoubleWritable(pi));
            }
            peer.sync();

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b517929e7d0cb94348a8b038071fbf65.html