龙空技术网

Apache Flink 入门

JetQin90 310

前言:

现时同学们对“apache接收不到lua数据”大体比较讲究,你们都想要剖析一些“apache接收不到lua数据”的相关文章。那么小编也在网络上汇集了一些有关“apache接收不到lua数据””的相关内容,希望姐妹们能喜欢,咱们快快来学习一下吧!

简介#

Apache Flink 是一个可以处理无界和有界数据流的分布式处理引擎,在设计之初,它就被设计成可以运行在各种各样的集群环境中。

无界数据流是指数据产生有一个开始但没有终止条件,因为它没有终止的边界,所以处理的过程中我们不能等待所有数据输入完毕才去处理,处理时是通过产生数据的事件,按照一定的顺序去持续处理流中的数据。有界数据是指定义了数据流的开始和结束,处理有界数据时,可以等到所有数据都获取到的情况下再进行后续计算,不需要这些数据是按照一定的顺序进入到处理队列,因为当你获得这些数据时你可以对这些数据进行排序。安装Flink#

安装Flink只需要下载其二进制包,并解压到本地目录即可

$ curl  cd ~/Downloads        # Go to download directory$ tar xzf flink-*.tgz   # Unpack the downloaded archive$ cd flink-1.8.0
启动Flink 集群#

安装好Flink之后可以通过如下命令启动Flink集群

bin/start-cluster.sh

启动之后可以通过查看Flink Dashboad如下图

编写Flink 任务#

接下来我们编写一个简单的Flink 任务用来统计单词的个数.

Step 1 通过Gradle init来创建一个java应用

gradle initStarting a Gradle Daemon (subsequent builds will be faster)Select type of project to generate:  1: basic  2: cpp-application  3: cpp-library  4: groovy-application  5: groovy-library  6: java-application  7: java-library  8: kotlin-application  9: kotlin-library  10: scala-libraryEnter selection (default: basic) [1..10] 6Select build script DSL:  1: groovy  2: kotlinEnter selection (default: groovy) [1..2] 1Select test framework:  1: junit  2: testng  3: spockEnter selection (default: junit) [1..3] 1Project name (default: flink-example):Source package (default: flink.example): io.github.jetqinBUILD SUCCESSFUL in 40s
Step 2 通过idea编辑
idea build.gradle

添加如下依赖

plugins {    // Apply the java plugin to add support for Java    id 'java'    // Apply the application plugin to add support for building an application    id 'application'}repositories {    // Use jcenter for resolving your dependencies.    // You can declare any Maven/Ivy/file repository here.    jcenter()}ext {    set('flinkVersion', '1.8.0')}// 指定运行的主类jar {            manifest {        attributes 'Main-Class': 'io.github.jetqin.App'    }}dependencies {    // add flink dependencies    implementation "com.google.guava:guava:27.0.1-jre"    implementation "org.apache.flink:flink-java:${flinkVersion}"    implementation "org.apache.flink:flink-streaming-java_2.11:${flinkVersion}"    implementation "org.apache.flink:flink-clients_2.11:${flinkVersion}"    implementation "org.apache.flink:flink-connector-wikiedits_2.11:${flinkVersion}"    // Use JUnit test framework    testImplementation 'junit:junit:4.12'}// Define the main class for the applicationmainClassName = 'io.github.jetqin.App'

Step 3: 编写Java文件

package io.github.jetqin;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class App {    public static void main(String[] args) throws Exception{        // the port to connect to        final int port;        try {            final ParameterTool params = ParameterTool.fromArgs(args);            port = params.getInt("port");        } catch (Exception e) {            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");            return;        }        // get the execution environment        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // get input data by connecting to the socket        DataStream<String> text = env.socketTextStream("localhost", port, "\n");        // parse the data, group it, window it, and aggregate the counts        DataStream<WordWithCount> windowCounts = text                .flatMap(new FlatMapFunction<String, WordWithCount>() {                    @Override                    public void flatMap(String value, Collector<WordWithCount> out) {                        for (String word : value.split("\\s")) {                            out.collect(new WordWithCount(word, 1L));                        }                    }                })                .keyBy("word")                .timeWindow(Time.seconds(5), Time.seconds(1))                .reduce(new ReduceFunction<WordWithCount>() {                    @Override                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {                        return new WordWithCount(a.word, a.count + b.count);                    }                });        // print the results with a single thread, rather than in parallel        windowCounts.print().setParallelism(1);        env.execute("Socket Window WordCount");    }    // Data type for words with count    public static class WordWithCount {        public String word;        public long count;        public WordWithCount() {}        public WordWithCount(String word, long count) {            this.word = word;            this.count = count;        }        @Override        public String toString() {            return word + " : " + count;        }    }}
运行Flink 任务#

接下来运行Flink任务,首先需要将编写的java应用打包成可执行文件

Step 1 打包应用并通过flink 命令执行此任务

gradle clean jarflink run build/libs/flink-example.jar --port 9000
Step 2 模拟一个客户端,不间断输入单词
nc -l 9000flinki love apache flinkflink is a powerful platform

输出结果#

查看输出结果可以通过如下命令查看

tail -f log/flink-*-taskexecutor-*.out

也可以在dashboard中job manager中查看

标签: #apache接收不到lua数据