龙空技术网

Java 版Flink WordCount单词统计

逍遥小摩托 166

前言:

而今兄弟们对“java编程统计单词个数”大约比较看重,朋友们都需要了解一些“java编程统计单词个数”的相关文章。那么小编同时在网上网罗了一些关于“java编程统计单词个数””的相关内容,希望咱们能喜欢,我们快快来了解一下吧!

描述

nc命令发送数据,Flink 接收数据,并对接数据统计单词总数WordCount一共分为四步按行打印输入数据拆分单词拆分单词为(Key,value)相同单词进行统计个数

pom.xml

<project xmlns="" xmlns:xsi=""	xsi:schemaLocation=" ">	<modelVersion>4.0.0</modelVersion>	<groupId>com.opensourceteams</groupId>	<artifactId>flink-quickstart</artifactId>	<version>0.1</version>	<packaging>jar</packaging>	<name>Flink Quickstart Job</name>	<url>;/url>	<properties>		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>		<flink.version>1.7.1</flink.version>		<java.version>1.8</java.version>		<scala.binary.version>2.11</scala.binary.version>		<maven.compiler.source>${java.version}</maven.compiler.source>		<maven.compiler.target>${java.version}</maven.compiler.target>	</properties>	<repositories>		<repository>			<id>apache.snapshots</id>			<name>Apache Development Snapshot Repository</name>			<url>;/url>			<releases>				<enabled>false</enabled>			</releases>			<snapshots>				<enabled>true</enabled>			</snapshots>		</repository>	</repositories>	<dependencies>		<!-- Apache Flink dependencies -->		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->		<dependency>			<groupId>org.apache.flink</groupId>			<artifactId>flink-java</artifactId>			<version>${flink.version}</version>			<scope>provided</scope>		</dependency>		<dependency>			<groupId>org.apache.flink</groupId>			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>			<version>${flink.version}</version>			<scope>provided</scope>		</dependency>		<dependency>			<groupId>org.apache.flink</groupId>			<artifactId>flink-clients_2.11</artifactId>			<version>${flink.version}</version>		</dependency>		<dependency>			<groupId>org.apache.flink</groupId>			<artifactId>flink-connector-wikiedits_2.11</artifactId>			<version>${flink.version}</version>		</dependency>		<!-- Add connector dependencies here. They must be in the default scope (compile). -->		<!-- Example:		<dependency>			<groupId>org.apache.flink</groupId>			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>			<version>${flink.version}</version>		</dependency>		-->		<!-- Add logging framework, to produce console output when running in the IDE. -->		<!-- These dependencies are excluded from the application JAR by default. -->		<dependency>			<groupId>org.slf4j</groupId>			<artifactId>slf4j-log4j12</artifactId>			<version>1.7.7</version>			<scope>runtime</scope>		</dependency>		<dependency>			<groupId>log4j</groupId>			<artifactId>log4j</artifactId>			<version>1.2.17</version>			<scope>runtime</scope>		</dependency>	</dependencies>	<build>		<plugins>			<!-- Java Compiler -->			<plugin>				<groupId>org.apache.maven.plugins</groupId>				<artifactId>maven-compiler-plugin</artifactId>				<version>3.1</version>				<configuration>					<source>${java.version}</source>					<target>${java.version}</target>				</configuration>			</plugin>			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->			<plugin>				<groupId>org.apache.maven.plugins</groupId>				<artifactId>maven-shade-plugin</artifactId>				<version>3.0.0</version>				<executions>					<!-- Run shade goal on package phase -->					<execution>						<phase>package</phase>						<goals>							<goal>shade</goal>						</goals>						<configuration>							<artifactSet>								<excludes>									<exclude>org.apache.flink:force-shading</exclude>									<exclude>com.google.code.findbugs:jsr305</exclude>									<exclude>org.slf4j:*</exclude>									<exclude>log4j:*</exclude>								</excludes>							</artifactSet>							<filters>								<filter>									<!-- Do not copy the signatures in the META-INF folder.									Otherwise, this might cause SecurityExceptions when using the JAR. -->									<artifact>*:*</artifact>									<excludes>										<exclude>META-INF/*.SF</exclude>										<exclude>META-INF/*.DSA</exclude>										<exclude>META-INF/*.RSA</exclude>									</excludes>								</filter>							</filters>							<transformers>								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">									<mainClass>com.module.flink.example.quickstart.StreamingJob</mainClass>								</transformer>							</transformers>						</configuration>					</execution>				</executions>			</plugin>		</plugins>		<pluginManagement>			<plugins>				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->				<plugin>					<groupId>org.eclipse.m2e</groupId>					<artifactId>lifecycle-mapping</artifactId>					<version>1.0.0</version>					<configuration>						<lifecycleMappingMetadata>							<pluginExecutions>								<pluginExecution>									<pluginExecutionFilter>										<groupId>org.apache.maven.plugins</groupId>										<artifactId>maven-shade-plugin</artifactId>										<versionRange>[3.0.0,)</versionRange>										<goals>											<goal>shade</goal>										</goals>									</pluginExecutionFilter>									<action>										<ignore/>									</action>								</pluginExecution>								<pluginExecution>									<pluginExecutionFilter>										<groupId>org.apache.maven.plugins</groupId>										<artifactId>maven-compiler-plugin</artifactId>										<versionRange>[3.1,)</versionRange>										<goals>											<goal>testCompile</goal>											<goal>compile</goal>										</goals>									</pluginExecutionFilter>									<action>										<ignore/>									</action>								</pluginExecution>							</pluginExecutions>						</lifecycleMappingMetadata>					</configuration>				</plugin>			</plugins>		</pluginManagement>	</build>	<!-- This profile helps to make things run out of the box in IntelliJ -->	<!-- Its adds Flink's core classes to the runtime class path. -->	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->	<profiles>		<profile>			<id>add-dependencies-for-IDEA</id>			<activation>				<property>					<name>idea.version</name>				</property>			</activation>			<dependencies>				<dependency>					<groupId>org.apache.flink</groupId>					<artifactId>flink-java</artifactId>					<version>${flink.version}</version>					<scope>compile</scope>				</dependency>				<dependency>					<groupId>org.apache.flink</groupId>					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>					<version>${flink.version}</version>					<scope>compile</scope>				</dependency>			</dependencies>		</profile>	</profiles></project>

按行打印输入数据

nc 命令,输入数据

在本地端口1234开启服务

nc -l 1234

一、按行打印输入数据Java

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * *  * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.module.flink.example.worldcount.n_001_输出流数据;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Run { public static void main(String[] args) throws Exception { final int port = 1234 ; // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); text.print().setParallelism(1); env.execute("输出nc终端数据") ; }}

二、拆分单词

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * *  * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.module.flink.example.worldcount.n_002_单词拆分;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Run { static Logger logger = LoggerFactory.getLogger(Run.class); public static void main(String[] args) throws Exception { final int port = 1234 ; // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); DataStream<String> wordDataStream = text.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { logger.info("s:{}",s); // for(String word : s.split("\\\\s")){ for(String word : s.split(" ")){ collector.collect(word); } } }) ; wordDataStream.print().setParallelism(1); env.execute("输出nc终端数据") ; }}

三、拆分单词为(Key,value)

WordWithCount.java

package com.module.flink.example.worldcount.n_003_单词拆分keyValue;public class WordWithCount { private String word; private long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public String toString() { return word + ":" + count; }}
Run.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * *  * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.module.flink.example.worldcount.n_003_单词拆分keyValue;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Run { static Logger logger = LoggerFactory.getLogger(Run.class); public static void main(String[] args) throws Exception { final int port = 1234 ; // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); DataStream<WordWithCount> wordDataStream = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> collector) throws Exception { logger.info("输入数据:{}",s); // for(String word : s.split("\\\\s")){ for(String word : s.split(" ")){ //一次数据 collector.collect(new WordWithCount(word,1L)); } } }) ; wordDataStream.print().setParallelism(1); env.execute("输出nc终端数据") ; }}

四、相同单词进行统计个数

Run

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * *  * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.module.flink.example.worldcount.n_004_单词相同进行统计;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Run { static Logger logger = LoggerFactory.getLogger(Run.class); public static void main(String[] args) throws Exception { final int port = 1234 ; // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); DataStream<WordWithCount> wordDataStream = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> collector) throws Exception { logger.info("输入数据:{}",s); // for(String word : s.split("\\\\s")){ for(String word : s.split(" ")){ //一次数据 collector.collect(new WordWithCount(word,1L)); } } }) ; DataStream<WordWithCount> wordCount = wordDataStream.keyBy("word") .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.getWord(),a.getCount() + b.getCount()); } }) ; wordCount.print().setParallelism(1); env.execute("输出nc终端数据") ; }}

标签: #java编程统计单词个数 #java统计单词数量 #java统计一段文字中单词的个数