龙空技术网

Flink教程-flink 1.11 流式数据ORC格式写入file

大数据技术与应用实战 386

前言:

眼前姐妹们对“java写orc文件”都比较讲究,看官们都想要了解一些“java写orc文件”的相关内容。那么小编在网摘上汇集了一些关于“java写orc文件””的相关内容,希望小伙伴们能喜欢,看官们快快来了解一下吧!

StreamingFileSink简介写入orc工厂类向量化操作构造OrcBulkWriterFactory实例讲解构造

在flink中,StreamingFileSink是一个很重要的把流式数据写入文件系统的sink,可以支持写入行格式(json,csv等)的数据,以及列格式(orc、parquet)的数据。 hive作为一个广泛的数据存储,而ORC作为hive经过特殊优化的列式存储格式,在hive的存储格式中占有很重要的地位。今天我们主要讲一下使用StreamingFileSink将流式数据以ORC的格式写入文件系统,这个功能是flink 1.11版本开始支持的。

StreamingFileSink简介

StreamingFileSink提供了两个静态方法来构造相应的sink,forRowFormat用来构造写入行格式数据的sink,forBulkFormat方法用来构造写入列格式数据的sink,

我们看一下方法forBulkFormat。

 public static <IN> StreamingFileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(   final Path basePath, final BulkWriter.Factory<IN> writerFactory) {  return new StreamingFileSink.DefaultBulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>()); }

这里需要两个参数,第一个是一个写入的路径,第二个是一个用于创建writer的实现BulkWriter.Factory接口的工厂类。

写入orc工厂类

首先我们要引入相应的pom

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-orc_2.11</artifactId>  <version>1.11.0</version></dependency>

flink为我们提供了写入orc格式的工厂类OrcBulkWriterFactory,我们简单看下这个工厂类的一些变量。

@PublicEvolvingpublic class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> { private static final Path FIXED_PATH = new Path("."); private final Vectorizer<T> vectorizer; private final Properties writerProperties; private final Map<String, String> confMap; private OrcFile.WriterOptions writerOptions;  public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {  this(vectorizer, new Configuration()); } public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {  this(vectorizer, null, configuration); } public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {        ................... }    .............}
向量化操作

flink使用了hive的VectorizedRowBatch来写入ORC格式的数据,所以需要把输入数据组织成VectorizedRowBatch对象,而这个转换的功能就是由OrcBulkWriterFactory中的变量---也就是抽象类Vectorizer类完成的,主要实现的方法就是org.apache.flink.orc.vector.Vectorizer#vectorize方法。

在flink中,提供了一个支持RowData输入格式的RowDataVectorizer,在方法vectorize中,根据不同的类型,将输入的RowData格式的数据转成VectorizedRowBatch类型。

 @Override public void vectorize(RowData row, VectorizedRowBatch batch) {  int rowId = batch.size++;  for (int i = 0; i < row.getArity(); ++i) {   setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);  } }
如果用户想将自己的输入格式以orc格式写入,那么需要继承抽象类Vectorizer,并且实现自己的转换方法vectorize。如果用户在写入orc文件之后,想添加一些自己的元数据信息,可以覆盖org.apache.flink.orc.vector.Vectorizer#addUserMetadata方法来添加相应的信息。

构造OrcBulkWriterFactory

工厂类一共提供了三个构造方法,我们看到最全的一个构造方法一共接受三个参数,第一个就是我们上面讲到的Vectorizer对象,第二个是一个写入orc格式的配置属性,第三个是hadoop的配置文件.

写入的配置来自,具体可以是以下的值.

key 缺省值 注释 orc.compress ZLIB high level compression = {NONE, ZLIB, SNAPPY} orc.compress.size 262,144 compression chunk size orc.stripe.size 67,108,864 memory buffer in bytes for writing orc.row.index.stride 10,000 number of rows between index entries orc.create.index true create indexes? orc.bloom.filter.columns ”” comma separated list of column names orc.bloom.filter.fpp 0.05 bloom filter false positive rate

实例讲解

最后,我们通过一个简单的实例来讲解一下具体的使用。

构造source

首先我们自定义一个source,模拟生成RowData数据,我们这个也比较简单,主要是生成了一个int和double类型的随机数.

 public static class MySource implements SourceFunction<RowData>{  @Override  public void run(SourceContext<RowData> sourceContext) throws Exception{   while (true){    GenericRowData rowData = new GenericRowData(2);    rowData.setField(0, (int) (Math.random() * 100));    rowData.setField(1, Math.random() * 100);    sourceContext.collect(rowData);    Thread.sleep(10);   }  }  @Override  public void cancel(){  } }
构造OrcBulkWriterFactory

接下来定义构造OrcBulkWriterFactory需要的参数。

  //写入orc格式的属性  final Properties writerProps = new Properties();  writerProps.setProperty("orc.compress", "LZ4");  //定义类型和字段名  LogicalType[] orcTypes = new LogicalType[]{new IntType(), new DoubleType()};  String[] fields = new String[]{"a1", "b2"};  TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(    orcTypes,    fields));  //构造工厂类OrcBulkWriterFactory  final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(    new RowDataVectorizer(typeDescription.toString(), orcTypes),    writerProps,    new Configuration());
构造StreamingFileSink
  StreamingFileSink orcSink = StreamingFileSink    .forBulkFormat(new Path(";), factory)    .build();

完整的代码请参考:

更多精彩内容,欢迎关注我公众号:【大数据技术与应用实战】

标签: #java写orc文件