解决Hadoop JobConf限制为5M的问题

我们的业务是要使用mongodb的Hadoop driver处理输出。我们重写的mongodbInputFormat的时候传递数据的时候是把数据写入conf,然后再从mongoSplitter里面里面从conf里面读出来。比如下面这样:

把数据放入数据conf:

List<Long> tagsUrns =null;
 //tagUrns 赋值.....
 conf.set("tagUrns",
            ObjectSerializer.serialize((Serializable) tagsUrns));

在mapper,reduce,或者mongoSpiltter里拿出conf里的数据:

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get("tagUrns"));

由于conf只能放入boolean、int、string的值,而我需要给hadoop Configuration放入的是list或者其他对象,所以需要用到一个序列化工具类。

序列化工具类代码:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import Java.io.*;

public class ObjectSerializer {

private static final Log log = LogFactory.getLog(ObjectSerializer.class);

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return "";
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException("Serialization error: " + e.getMessage(), e);
    }
}

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException("Deserialization error: " + e.getMessage(), e);
    }
}

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a')));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a')));
    }

return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c - 'a') << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c - 'a');
    }
    return bytes;
}

}

但是当我放入的数据太大时,运行hadoop任务时报错,错误信息:

错误信息说明hadoop的conf是有限制的,查询下发现限制为5M:

所以当时就懵了。这不从conf传入,好像又拿不到。最后想着能不能从hdfs文件直接读数据文件。但是我的数据必须在mongospliter里面获取数据,而这里只能拿到conf。最后发现hadoop获取FileSystem方式为:

@Resource(name = "hadoopConfiguration")
private Configuration configuration = null;

...........

fileSystem = FileSystem.get(configuration);

更多详情见请继续阅读下一页的精彩内容

推荐阅读

Hadoop 2.0 安装向导 (0.23.x)

Hadoop 1.2.1 单节点安装(Single Node Setup)步骤

CentOS上安装Hadoop

Ubuntu 12.04安装Hadoop

CentOS 6.3 x86_64安装与配置Hadoop-1.0

Hadoop入门--Hadoop2伪分布式安装

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/5278b4c95d82ad402dcbf616a3f987a0.html