大数据HelloWorld-Flink实现WordCount(2)
按照提示输入相关信息,即可生成最终的项目。
├── pom.xml
└── src
└── main
├── resources
│ └── log4j.properties
└── scala/java
└── org
└── myorg
└── quickstart
├── BatchJob.scala
└── StreamingJob.scala
把工程导入到IDEA中
如果使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。
不知道如何操作可以联系我 微信公号<指尖数虫>。
package jar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取目录下的文件
DataSource<String> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
//把文件中的内容按照空格进行拆分为 word,1 1 是为了能够在下面进行计算.
data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
})
// 按照元组中的第1位进行分组
.groupBy(0)
// 分组的元组的计算方式为 value +value 也就是刚才的 同样的词 把 1+1
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+ t2.f1);
}
})
//输出结果
.print();
}
}
总结
以上所述是小编给大家介绍的大数据HelloWorld-Flink实现WordCount,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对黑区网络网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!
