Hadoop map reduce 过程获取环境变量

Hadoop任务执行过程中,在每一个map节点或者reduce节点能获取一下环境变量,利用这些变量可以为特殊的需求服务,例如:获取当前map节点处理的数据文件的路径。

hadoop是java实现的,利用java可以很方便的获取相关环境变量,其内部包含在Context和MRJobConfig中(hadoop版本不一样,可能会有区别,我的hadoop是0.21)。

举例:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class MergeDaysMapper extends Mapper<LongWritable, Text, Text, Text> {

private String inputFile = null;

public void setup(Context context)
{
System.err.println("[STARTS TO GET PARAMETERS OF THIS JOB]");
Path input = ((FileSplit)context.getInputSplit()).getPath();
inputFile = input.toString();
System.err.println("Input: "+ input.toString());
System.out.println("Input: "+ input.getName());
System.out.println("MAP_INPUT_FILE: " + MRJobConfig.MAP_INPUT_FILE);
System.out.println("MAP_INPUT_PATH:"+ MRJobConfig.MAP_INPUT_PATH);
System.out.println("MAP_INPUT_START:"+ MRJobConfig.MAP_INPUT_START);
System.err.println("JOB_NAME" + MRJobConfig.JOB_NAME);
System.out.println("[FINISHED GETTING PARAMETERS OF THIS JOB]");
}
    
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if (null == inputFile)
   context.write(new Text("key"), new Text("inputFile"));
else
context.write(new Text("key"), new Text(inputFile));
}
}


同时,在streaming任务中也有同样的需求,需要获取相关环境变量,查过别人的资料,如下:

{{

streaming框架通过设置环境变量的方式给mapper、reducer程序传递配置信息。常用的环境变量如下:

HADOOP_HOME

 

计算节点上配置的Hadoop路径

 

LD_LIBRARY_PATH

 

计算节点上加载库文件的路径列表

 

PWD

 

当前工作目录

 

dfs_block_size

 

当前设置的HDFS文件块大小

 

map_input_file

 

mapper正在处理的输入文件路径

 

mapred_job_id

 

作业ID

 

mapred_job_name

 

作业名

 

mapred_tip_id

 

当前任务的第几次重试

 

mapred_task_id

 

任务ID

 

mapred_task_is_map

 

当前任务是否为map

 

mapred_output_dir

 

计算输出路径

 

mapred_map_tasks

 

计算的map任务数

 

mapred_reduce_tasks

 

计算的reduce任务数

 

}}


自己测试了一下,不对,又是版本问题,查了#How+do+I+get+the+JobConf+variables+in+a+streaming+job%27s+mapper%2Freducer%3F

解决如下:

Name Type Description
mapreduce.job.id   String   The job id  
mapreduce.job.jar   String   job.jar location in job directory  
mapreduce.job.local.dir   String   The job specific shared scratch space  
mapreduce.task.id   String   The task id  
mapreduce.task.attempt.id   String   The task attempt id  
mapreduce.task.ismap   boolean   Is this a map task  
mapreduce.task.partition   int   The id of the task within the job  
mapreduce.map.input.file   String   The filename that the map is reading from  
mapreduce.map.input.start   long   The offset of the start of the map input split  
mapreduce.map.input.length   long   The number of bytes in the map input split  
mapreduce.task.output.dir   String   The task's temporary output directory  
相关参数在streaming中“."用”_"代替即可。

例子:

#!/bin/sh


while read line
do
  echo "$line"
  echo $mapreduce_map_input_file
done


测试通过

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/3bedd9ccc2b038ddc515b9c854cc3f35.html