前言:
现时同学们对“apache接收不到lua数据”大体比较讲究,你们都想要剖析一些“apache接收不到lua数据”的相关文章。那么小编也在网络上汇集了一些有关“apache接收不到lua数据””的相关内容,希望姐妹们能喜欢,咱们快快来学习一下吧!简介#
Apache Flink 是一个可以处理无界和有界数据流的分布式处理引擎,在设计之初,它就被设计成可以运行在各种各样的集群环境中。
无界数据流是指数据产生有一个开始但没有终止条件,因为它没有终止的边界,所以处理的过程中我们不能等待所有数据输入完毕才去处理,处理时是通过产生数据的事件,按照一定的顺序去持续处理流中的数据。有界数据是指定义了数据流的开始和结束,处理有界数据时,可以等到所有数据都获取到的情况下再进行后续计算,不需要这些数据是按照一定的顺序进入到处理队列,因为当你获得这些数据时你可以对这些数据进行排序。安装Flink#
安装Flink只需要下载其二进制包,并解压到本地目录即可
$ curl cd ~/Downloads # Go to download directory$ tar xzf flink-*.tgz # Unpack the downloaded archive$ cd flink-1.8.0启动Flink 集群#
安装好Flink之后可以通过如下命令启动Flink集群
bin/start-cluster.sh
启动之后可以通过查看Flink Dashboad如下图
编写Flink 任务#
接下来我们编写一个简单的Flink 任务用来统计单词的个数.
Step 1 通过Gradle init来创建一个java应用
gradle initStarting a Gradle Daemon (subsequent builds will be faster)Select type of project to generate: 1: basic 2: cpp-application 3: cpp-library 4: groovy-application 5: groovy-library 6: java-application 7: java-library 8: kotlin-application 9: kotlin-library 10: scala-libraryEnter selection (default: basic) [1..10] 6Select build script DSL: 1: groovy 2: kotlinEnter selection (default: groovy) [1..2] 1Select test framework: 1: junit 2: testng 3: spockEnter selection (default: junit) [1..3] 1Project name (default: flink-example):Source package (default: flink.example): io.github.jetqinBUILD SUCCESSFUL in 40sStep 2 通过idea编辑
idea build.gradle
添加如下依赖
plugins { // Apply the java plugin to add support for Java id 'java' // Apply the application plugin to add support for building an application id 'application'}repositories { // Use jcenter for resolving your dependencies. // You can declare any Maven/Ivy/file repository here. jcenter()}ext { set('flinkVersion', '1.8.0')}// 指定运行的主类jar { manifest { attributes 'Main-Class': 'io.github.jetqin.App' }}dependencies { // add flink dependencies implementation "com.google.guava:guava:27.0.1-jre" implementation "org.apache.flink:flink-java:${flinkVersion}" implementation "org.apache.flink:flink-streaming-java_2.11:${flinkVersion}" implementation "org.apache.flink:flink-clients_2.11:${flinkVersion}" implementation "org.apache.flink:flink-connector-wikiedits_2.11:${flinkVersion}" // Use JUnit test framework testImplementation 'junit:junit:4.12'}// Define the main class for the applicationmainClassName = 'io.github.jetqin.App'
Step 3: 编写Java文件
package io.github.jetqin;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class App { public static void main(String[] args) throws Exception{ // the port to connect to final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // Data type for words with count public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } }}运行Flink 任务#
接下来运行Flink任务,首先需要将编写的java应用打包成可执行文件
Step 1 打包应用并通过flink 命令执行此任务
gradle clean jarflink run build/libs/flink-example.jar --port 9000Step 2 模拟一个客户端,不间断输入单词
nc -l 9000flinki love apache flinkflink is a powerful platform
输出结果#
查看输出结果可以通过如下命令查看
tail -f log/flink-*-taskexecutor-*.out
也可以在dashboard中job manager中查看
标签: #apache接收不到lua数据