龙空技术网

解析Spark读取hdfs的文件如何分区

二十岁背包 153

前言:

此时姐妹们对“hdfs文件切片算法”大致比较重视,同学们都需要知道一些“hdfs文件切片算法”的相关内容。那么小编也在网络上网罗了一些有关“hdfs文件切片算法””的相关内容,希望各位老铁们能喜欢,咱们快快来了解一下吧!

spark读取hdfs的文件是怎么分区的,读取代码如下:

val df = sc.textFile("data/wc.txt",3)

闪闪发光的星星灰尘散景背景上的足迹

一.分析

spark读取hdfs的文件分区跟hadoop的分区完全相同,因为底层使用的就是Hadoop的TextInputFormat,考虑两内容:

1)关于文件分区数量计算:

指定的预分区数量是最小分区数量,如:代码中的参数3。

真正的分区计算: 每个分区字节数 = 文件字节数/预分区数量

如果没有整除,判断余数是否大于分区字节数 * 0.1,如果大于则会新增一个分区,剩余的放在这个分区。否则不会新加分区,把余数放到最后一个分区里。

2)分区数据如何读取:

分区读取数据是按行读取,但是会考虑文件的偏移量(offset)的设置。虽然第一个分区字节数不包含一整行,但是会读取一整行。当某个分区的偏移量全被之前的分读走了,这个分区就是空的。

注意:

1.当位移量读取了回撤换行,会把下一行的数据也会读取。

2.当读取多个文件时,会把所有文件字节加起来计算分区,但是读取的时候不会夸文件读取。

二.代码分析

1)读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下,FileInputFormat类中的方法:

public InputSplit[] getSplits(JobConf job, int numSplits)

throws IOException {

long totalSize = 0; // compute total size

for (FileStatus file: files) {// check we have valid files

if (file.isDirectory()) {

throw new IOException("Not a file: "+ file.getPath());

}

totalSize += file.getLen();

}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.

FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

...

for (FileStatus file: files) {

...

if (isSplitable(fs, path)) {

long blockSize = file.getBlockSize();

long splitSize = computeSplitSize(goalSize, minSize, blockSize);

...

}

protected long computeSplitSize(long goalSize, long minSize,long blockSize) {

return Math.max(minSize, Math.min(goalSize, blockSize));

2) 分区数据读取的代码LineReader

public LineRecordReader(Configuration job, FileSplit split,    byte[] recordDelimiter) throws IOException {  this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.    LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);  start = split.getStart();  end = start + split.getLength();  final Path file = split.getPath();  compressionCodecs = new CompressionCodecFactory(job);  codec = compressionCodecs.getCodec(file);  // open the file and seek to the start of the split  final FileSystem fs = file.getFileSystem(job);  fileIn = fs.open(file);  if (isCompressedInput()) {    decompressor = CodecPool.getDecompressor(codec);    if (codec instanceof SplittableCompressionCodec) {      final SplitCompressionInputStream cIn =        ((SplittableCompressionCodec)codec).createInputStream(          fileIn, decompressor, start, end,          SplittableCompressionCodec.READ_MODE.BYBLOCK);      in = new CompressedSplitLineReader(cIn, job, recordDelimiter);      start = cIn.getAdjustedStart();      end = cIn.getAdjustedEnd();      filePosition = cIn; // take pos from compressed stream    } else {      in = new SplitLineReader(codec.createInputStream(fileIn,          decompressor), job, recordDelimiter);      filePosition = fileIn;    }  } else {    fileIn.seek(start);    in = new UncompressedSplitLineReader(        fileIn, job, recordDelimiter, split.getLength());    filePosition = fileIn;  }

三案例分析

案例:读取文件重新分区,再写入到文件

代码:

val conf = new Configuration()val fs = FileSystem.getLocal(conf)fs.delete(new Path("data/wc.out2"),true)val df = sc.textFile("data/wc.txt",3)df.saveAsTextFile("data/wc.out2")

 文件内容:   d   a   y   u   e   e   t 1)计算分区: 总字节数:14;指定预分区为3. 目标分区大小: goalSize = 14/3 =4, 余2; 2/4>0.1,所以会生成4个分区,每个分区大小是4个字节。每个分区读取偏移量入下:   0,4   4,8   8,12   12, 132)分区读取内容 按行读取 文件内容的offset为: 0 1 2 3 4 5 6 7 8 9 10 11 12  第1个分区读取:0到4,读取前3行 ;注意:当位移量偏过换行符时,会把下一行的数据也会读取了  第2个分区读取:4到8,因为4,5被读去了,从来6开始读到9。(因为按行读取所以读到9)  第3个分区读取:8到12,因为8,9被读去了,从来10开始读到12。  第4个分区读取:12到13 所以为

四.总结

spark读取hdfs文件分区比较复杂,需要仔细研究研究。

标签: #hdfs文件切片算法