龙空技术网

Idea调试Flink读取kerberos的Kafka集群

大数据从业者FelixZh 134

前言:

如今我们对“apachefelix启动”大概比较珍视,我们都想要知道一些“apachefelix启动”的相关知识。那么小编同时在网摘上网罗了一些对于“apachefelix启动””的相关知识,希望各位老铁们能喜欢,同学们一起来了解一下吧!

背景概述

说实话,诸如SparkStreaming、Flink此类型的分布式计算引擎,在本人的日常开发过程中,都是本地(windows)写代码直接打包,提交到Yarn集群,有问题再改,通常1到5把就ok了。从来没有在本地idea中debug。究其原因:一来是构建本地环境有些许麻烦;二来本地调试通过与能够提交运行在Yarn属于两回事。不过,有很多客户同学都喜欢本地idea调试,FelixZh作为大数据从业者属于服务人员,客户需求必须满足啊。

环境说明

Kafka2.3.0(kerberos)

Flink1.6.0

功能说明

Flink读取开启Kerberos的Kafka数据,打印输出

实现说明

Idea调试Flink就是在Idea内启动一个MiniCluster。

访问开启Kerberos的Kafka集群,需要加载krb5.conf(主要描述kerberos kdc信息,如IP地址等),还需要加载kafka_client_jaas.conf(用于提供kerberos认证用到的信息,如keytab文件,principal等)。

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns=";         xmlns:xsi=";         xsi:schemaLocation=" ;>    <modelVersion>4.0.0</modelVersion>    <groupId>KafkaFlinkIdeaDemo</groupId>    <artifactId>KafkaFlinkIdeaDemo</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <flink.version>1.6.0</flink.version>        <scala.binary.version>2.11</scala.binary.version>    </properties>    <dependencies>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>            <version>${flink.version}</version>            <!--<scope>compile</scope>-->  <!-- idea -->            <scope>provided</scope>  <!-- yarn cluster-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>            <version>${flink.version}</version>            <!--<scope>compile</scope>-->  <!-- idea -->            <scope>provided</scope>  <!-- yarn cluster-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-runtime_${scala.binary.version}</artifactId>            <version>${flink.version}</version>            <!--<scope>compile</scope>-->  <!-- idea -->            <scope>provided</scope>  <!-- yarn cluster-->        </dependency>        <dependency>            <groupId>ch.qos.logback</groupId>            <artifactId>logback-classic</artifactId>            <version>1.2.3</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.2</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-shade-plugin</artifactId>                <version>3.0.0</version>                <executions>                    <execution>                        <phase>package</phase>                        <goals>                            <goal>shade</goal>                        </goals>                        <configuration>                            <artifactSet>                                <excludes>                                    <exclude>com.google.code.findbugs:jsr305</exclude>                                    <exclude>org.slf4j:*</exclude>                                    <exclude>log4j:*</exclude>                                </excludes>                            </artifactSet>                            <filters>                                <filter>                                    <!-- Do not copy the signatures in the META-INF folder.                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->                                    <artifact>*:*</artifact>                                    <excludes>                                        <exclude>META-INF/*.SF</exclude>                                        <exclude>META-INF/*.DSA</exclude>                                        <exclude>META-INF/*.RSA</exclude>                                    </excludes>                                </filter>                            </filters>                            <transformers>                                <transformer                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                    <mainClass>KafkaFlink</mainClass>                                </transformer>                            </transformers>                        </configuration>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>

完整代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;public class KafkaFlinkIdea {    public static void main(String[] args) throws Exception {        final ParameterTool para = ParameterTool.fromArgs(args);        if (!para.has("path")) {            System.out.println("Error: not exist --path /opt/your.properties");            System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties");            System.exit(0);        }        if (OperatingSystem.IS_WINDOWS) {            System.setProperty("java.security.auth.login.config", "d:\\KafkaFlinkIdeaDemoConf//kafka_client_jaas.conf");            System.setProperty("java.security.krb5.conf", "d:\\KafkaFlinkIdeaDemoConf//krb5.conf");        }        ParameterTool param = ParameterTool.fromPropertiesFile(para.getRequired("path"));        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        if (param.getRequired("checkpoint.enable").equals("1")) {            env.getConfig().setUseSnapshotCompression(true);            env.enableCheckpointing(Long.valueOf(param.getRequired("checkpoint.interval"))); // create a checkpoint every 5 seconds        }        env.getConfig().setGlobalJobParameters(param); // make parameters available in the web interface        Properties consumerProp = new Properties();        consumerProp.setProperty("bootstrap.servers", param.getRequired("consumer.bootstrap.servers"));        consumerProp.setProperty("group.id", param.getRequired("consumer.group.id"));        consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT");        consumerProp.setProperty("sasl.mechanism", "GSSAPI");        consumerProp.setProperty("sasl.kerberos.service.name", "kafka");        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>(param.getRequired("consumer.kafka.topic"),                new SimpleStringSchema(), consumerProp);        DataStream<String> sourceDataStream = env.addSource(kafkaConsumer);        sourceDataStream.print();        env.execute();    }}
import java.util.Locale;public class OperatingSystem {    public OperatingSystem() {    }    public static final String NAME;    public static final boolean IS_WINDOWS;    static {        NAME = System.getProperty("os.name").toLowerCase(Locale.ROOT);        IS_WINDOWS = NAME.startsWith("windows");    }    public static void main(String [] args){        System.out.println(OperatingSystem.NAME);        System.out.println(OperatingSystem.IS_WINDOWS);    }}

FlinkJob_Kafka.properties配置文件

consumer.bootstrap.servers=192.168.23.34:9092consumer.group.id=testconsumer.kafka.topic=testcheckpoint.enable=1checkpoint.interval=60000

logback.xml配置文件

<?xml version="1.0" encoding="UTF-8"?><configuration>    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">        <encoder>            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSSS} {%thread} %-5level %logger - %msg%n</pattern>        </encoder>    </appender>    <root level="INFO">        <appender-ref ref="STDOUT"/>    </root></configuration>

kafka_client_jaas.conf配置文件

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab="d:\\KafkaFlinkIdeaDemoConf\\user01.keytab"

storeKey=true

useTicketCache=false

serviceName="kafka"

principal="user01";

};

注意提前准备其他需要用到的文件

krb5.conf、user01.keytab从kerberos的kdc获取

Idea运行

注意配置输入参数,上图所选中的内容

标签: #apachefelix启动