前言:
如今我们对“apachefelix启动”大概比较珍视,我们都想要知道一些“apachefelix启动”的相关知识。那么小编同时在网摘上网罗了一些对于“apachefelix启动””的相关知识,希望各位老铁们能喜欢,同学们一起来了解一下吧!背景概述
说实话,诸如SparkStreaming、Flink此类型的分布式计算引擎,在本人的日常开发过程中,都是本地(windows)写代码直接打包,提交到Yarn集群,有问题再改,通常1到5把就ok了。从来没有在本地idea中debug。究其原因:一来是构建本地环境有些许麻烦;二来本地调试通过与能够提交运行在Yarn属于两回事。不过,有很多客户同学都喜欢本地idea调试,FelixZh作为大数据从业者属于服务人员,客户需求必须满足啊。
环境说明
Kafka2.3.0(kerberos)
Flink1.6.0
功能说明
Flink读取开启Kerberos的Kafka数据,打印输出
实现说明
Idea调试Flink就是在Idea内启动一个MiniCluster。
访问开启Kerberos的Kafka集群,需要加载krb5.conf(主要描述kerberos kdc信息,如IP地址等),还需要加载kafka_client_jaas.conf(用于提供kerberos认证用到的信息,如keytab文件,principal等)。
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="; xmlns:xsi="; xsi:schemaLocation=" ;> <modelVersion>4.0.0</modelVersion> <groupId>KafkaFlinkIdeaDemo</groupId> <artifactId>KafkaFlinkIdeaDemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.6.0</flink.version> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>compile</scope>--> <!-- idea --> <scope>provided</scope> <!-- yarn cluster--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>compile</scope>--> <!-- idea --> <scope>provided</scope> <!-- yarn cluster--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>compile</scope>--> <!-- idea --> <scope>provided</scope> <!-- yarn cluster--> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>KafkaFlink</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build></project>
完整代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;public class KafkaFlinkIdea { public static void main(String[] args) throws Exception { final ParameterTool para = ParameterTool.fromArgs(args); if (!para.has("path")) { System.out.println("Error: not exist --path /opt/your.properties"); System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties"); System.exit(0); } if (OperatingSystem.IS_WINDOWS) { System.setProperty("java.security.auth.login.config", "d:\\KafkaFlinkIdeaDemoConf//kafka_client_jaas.conf"); System.setProperty("java.security.krb5.conf", "d:\\KafkaFlinkIdeaDemoConf//krb5.conf"); } ParameterTool param = ParameterTool.fromPropertiesFile(para.getRequired("path")); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); if (param.getRequired("checkpoint.enable").equals("1")) { env.getConfig().setUseSnapshotCompression(true); env.enableCheckpointing(Long.valueOf(param.getRequired("checkpoint.interval"))); // create a checkpoint every 5 seconds } env.getConfig().setGlobalJobParameters(param); // make parameters available in the web interface Properties consumerProp = new Properties(); consumerProp.setProperty("bootstrap.servers", param.getRequired("consumer.bootstrap.servers")); consumerProp.setProperty("group.id", param.getRequired("consumer.group.id")); consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT"); consumerProp.setProperty("sasl.mechanism", "GSSAPI"); consumerProp.setProperty("sasl.kerberos.service.name", "kafka"); FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>(param.getRequired("consumer.kafka.topic"), new SimpleStringSchema(), consumerProp); DataStream<String> sourceDataStream = env.addSource(kafkaConsumer); sourceDataStream.print(); env.execute(); }}
import java.util.Locale;public class OperatingSystem { public OperatingSystem() { } public static final String NAME; public static final boolean IS_WINDOWS; static { NAME = System.getProperty("os.name").toLowerCase(Locale.ROOT); IS_WINDOWS = NAME.startsWith("windows"); } public static void main(String [] args){ System.out.println(OperatingSystem.NAME); System.out.println(OperatingSystem.IS_WINDOWS); }}
FlinkJob_Kafka.properties配置文件
consumer.bootstrap.servers=192.168.23.34:9092consumer.group.id=testconsumer.kafka.topic=testcheckpoint.enable=1checkpoint.interval=60000
logback.xml配置文件
<?xml version="1.0" encoding="UTF-8"?><configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSSS} {%thread} %-5level %logger - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="STDOUT"/> </root></configuration>
kafka_client_jaas.conf配置文件
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="d:\\KafkaFlinkIdeaDemoConf\\user01.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="user01";
};
注意提前准备其他需要用到的文件
Idea运行
标签: #apachefelix启动