龙空技术网

大数据笔试真题——第一章:通用面试题

千锋IT小助手 102

前言:

当前看官们对“spark考试题库”大约比较重视,我们都想要分析一些“spark考试题库”的相关知识。那么小编在网上网罗了一些有关“spark考试题库””的相关知识,希望朋友们能喜欢,我们一起来了解一下吧!

试题总体概述

面试题包含时下流行的多个大数据工具和概念(spark、sparkStreaming、kafka、hadoop、hive、hbase、redis、flume、sqoop、zk、azkaban、kylin、Elatic Search)的原理介绍、实战总结、调优方式等,我会不间断的更新,维护,希望可以对正在找大数据工作的朋友们有所帮助.

第一章目录第一章 通用1.1 数据倾斜

当某个job长时间运行没有结束,可能发生了数据倾斜。

1.1.1 hive

设置map端聚合和二次group by保证reduce数据大概平均,然后再设置reduce数量减少每个reduce的数据量

尽量少用distinct,不仅吃不到map端聚合(distinct原理是全局排序去重),而且多个distinct也吃不到二次group的优化。

如果group by 多个字段,或者其它二次group失效的情况,可以走下方spark的解决方案,将hql分多个hql来做。

1.1.2 Spark

比起hive来说,spark对数据倾斜的优化可以更细一些(也可以说更麻烦些)。

先用sample(false,0.x)采样key,找出倾斜的key把数据集拆成倾斜的部分和不倾斜的部分,不倾斜的部分走正常流程倾斜的部分key前面加上一个定长的随机字符串,然后执行重分区重分区后进行一个聚合操作,然后去除定长前缀再聚合一次。如果是大表join大表,其中一个表有数据倾斜,就需要用膨胀法,将倾斜部分的key加上一个0-n的前缀,一条数据膨胀成n条,然后将另一个表的这部分key也加上相应的前缀,然后单独对这部分数据进行一次双重聚合,与不倾斜的数据进行union操作,完成聚合。空值看作是特殊的key,空值多了一样用3的方法去解决。1.2 海量数据1.2.1 如何调优

  1.抽样检测分组字段,看看是否有倾斜  2.如果没有倾斜,就正常增加reduce数量,设置中间ORC+SNAPPY压缩  3.如果有倾斜,把倾斜的部分过滤出来加前缀打散处理,不倾斜的部分正常处理。  4.如果是大表join小表,大表倾斜,可以使用map端join方法,小表倾斜直接无视。  5.如果是大表join大表,某表倾斜,可以使用膨胀法处理。
1.2.2 实时排序

跳表法快速排序

  将值域划分区间建有序链表,原本海量的遍历会被压缩N倍。每个值域对应一个有序链表,因此是双重链表。  前提:除了跳表外还有一张作品表,以作品id为key,value含分数  值变更时,先查询作品表更新原值,通过原值到对应区间的链表中删除该值,新值插入对应区间的链表中。  求排名:1.每个区间链表除了保存作品和分数外,还额外维护一个区间总长          2.每次新值插入时,都将原区间总长-1,新区间总长+1          3.求排名时遍历第一层链表将区间总长累加,然后遍历该值所在的区间求出区间内排名,两者相加。            求TopN:第二层链表一路往下走N个。  缺点:事实上所有针对数据直接排序的方式都存在锁问题,每次更新只能有一个线程做,否则必然会乱。

平衡二叉树排序

  假设总分为5                 [0,5]  1.将总分一分为二            [0,2],[3,5]  2.将划分的部分继续二分      [0,1],[1,2],[3,4],[4,5]  3.重复2直到划分到底         [0,0],[1,1],[2,2],[3,3]...  ​  采用链表的方式从顶点往下一路关联起来,每个区间key对应一个人数value。  假设有某个值从2变成3。  先从顶点开始往下遍历查找[2,2],遍历过程中经过的节点人数全部-1,终点[2,2]的人数也-1;  然后从顶点开始往下遍历查找[3,3],经过的节点人数全部+1。终点也+1。  求排名:假设要求分数为3的排名,从起点开始往下查找[3,3],如果是往左走,就把右值累加上,如果往右走则不加。  优点:毫秒级响应排名,更新不需要锁表,因为是单纯的加减日志。  缺点:消耗大量内存存储,每次更新都要改变N个值,每次更新值时还需要把排名添加到TopN表里。

1.2.3 求交集

  理论:  如果是k-v形式的数据,或者是可以提取key的数据,可以使用分组法比较。  假设我有x G内存,要求A,B的交集。  先把A,B分别分组拆分成多个小文件,每个小文件的大小为(x-300)/2,超过的部分再水平拆分成多个文件。  然后按分组读入A,B的数据,在内存中比较,求出交集部分。  ​  如果分组太细,就按分组key的哈希进行分区,一次比较多个分组。  为了方便小文件的合并,可以在输出时将数据排序输出,创建索引文件,后续可以采用归并排序的方式快速合并。  ​  如果不能提取key,则按照hash分组。  ​  实战:假设已经把两张表要求交集的部分字段拼接成"key"  select * from A join B on A.key = B.key
1.2.4 求差集
  select * from A full outer join B on A.key = B.key where A.key is null or B.key is null;
1.3 布隆过滤器 原理

将字符串用哈希函数转换为一个或多个整型值,将bit型数组中对应位置上的0改为1。判断该字符串是否存在时,只需要判断这些位置上的值是否都为1,如果不是就说明一定不存在。但是反过来不能说明一定存在。

如:abc 转换为3和5,就将arr[3]和arr[5]的值设置为1,只要这两个值都为1,就说明abc可能存在,如果它们不全为1,可以保证abc一定不存在。

1.4 存储格式

TextFile

默认格式,按行存储,可以压缩但压缩文件不支持分片,反序列化开销是SequenceFile的几十倍(需要判断分隔符和换行符)

SequenceFile

hadoop原生支持,将kv以二进制方式按行存储,压缩后的文件支持压缩。默认以record压缩,可以改为block性能更好。压缩率很低,查询速度一般。

RCFile

按行分块、按列存储的存储方式,反序列化速度较慢,但压缩率和查询速度最快。

ORC file

RC的改良版,每个Task输出单文件、存储索引、支付复杂类型、支持块压缩、可以直接读取,ORC比RC高效很多。

Parquet

列式存储,是spark的默认存储格式,压缩和查询性能比ORC稍差,但是支持的编码更多,而且对嵌套式结构支持的更好(json)。

因此对结构化数仓来说ORC file格式更好,对灵活的spark作业来说Parquet格式更好。

1.5 压缩格式

上面虽然提到了压缩比,但只不过是相对于纯文本,在数据的存储方式上带来的数据量减少,并不是真正的压缩方式。

下方介绍在这些存储方式之上进一步减少数据量的压缩方式。

gzip

spark默认压缩方式,hadoop原生支持,压缩比率很高、压缩和解压速度很快,支持纯文本式编辑,使用方便,但是压缩后不支持分片,因此适用于1个块内大小的文件。

lzo

hadoop流行的压缩方式,需要安装,压缩解压速度较快,压缩率适中,建立索引后支持分片,压缩单个大文件时可以使用。

snappy

高速压缩和解压,压缩率较低,需要安装,不支持分片,推荐作为临时数据的压缩方式。

bzip2

非常高的压缩率,但解压速度很慢,支持分片。hadoop本身支持,但本地库不支持。一般和lzo选其中一个作为数据源的压缩格式。

1.6 手写算法1.6.1 四大排序

速度上:改良的归并排序算法是n-nlogn,快速排序的时间复杂度是nlogn-n^2,冒泡和选择都是n^2。

稳定性:快速、选择都不稳定,冒泡和归并都稳定

空间上:冒泡和选择都是1,快速为logn,归并为n

  /**    * 一个完整的快速排序的方法    * 可以传入任意类型的buffer对其排序后返回    * 可以传入一个比较器对自定义类型排序    *    * @author 孤星魅影    * @param buffer 任意类型的buffer    * @param isASC  是否正序排序    * @param ev     自定义比较器    * @return 返回一个快速排序后的buffer    */  def quickSort[T: ClassTag](buffer: mutable.Buffer[T])(implicit isASC: Boolean = true, ev: T => Comparable[T]): mutable.Buffer[T] = {    //为null、为空、长度为1时都返回buffer    if (buffer == null || buffer.length <= 1)      return buffer        //1.将数组的第一个数head与后面所有的数进行比较,左边放比head小的数,右边放比head大的数(从小到大)    //2.递归执行拆分,直到数组只剩下1个    //3.按顺序将左、中、右三者拼接起来,完成排序    val (left, right) = buffer.tail.partition(t => {      if (isASC) t.compareTo(buffer.head) < 0      else t.compareTo(buffer.head) > 0    })    quickSort(left) += buffer.head ++= quickSort(right)  }

1.6.2 懒汉式单例

  scala  object{    lazy val obj = new T()  }  ​  java  class T{    private static T obj = null;    private T(){}        //先执行一次null检验,然后上锁再执行一次null检验,防止同时有两个线程通过了第一次null检验导致异常。    public T getObj()={      if(obj==null){        sysconized(obj){          if(obj == null)            obj = new T();        }      }      return obj;    }  }

1.6.3 ip转int

  a,b,c,d = a*256*256*256+b*256*256+c*256+123  ip是256进制,所以a需要*3个256才能保证每个bcd不重复,b,c,d同理,使用位计算效率更高:*256相当于<<8  ​  提供一段scala代码:  def ip2long(ip:String)=ip.split(".").reverse.zipWithIndex.map(v=>v._1<<(8*v._2)).reduce(_+_)  ​  可以提取转换后的数字的前三位进行分组查询,效率更高:  ip.groupby(_.substring(0,3))

1.6.4 求质数

  /**      * 求min - max以内的所有质数      * 只需要判断一个数是否能被自身以外的大于1的质数数整除就可以了      * @return      */    def countPrime(min:Int,max:Int):Array[Int]={      val arr = mutable.Set[Int](1)      (min to max).foreach(index=> {        if(!arr.exists(prime=>{          //某个数如果能被除自己和1以外的质数整除,说明该数不是质数          if(prime>1 && index != prime && index % prime == 0) true          else false        }))          arr += index      })      arr.toArray.sorted    }

1.6.5 判断平衡二叉树

  原理:最底层的左节点和右节点相差为1(左右子树相差为1),例子可以看上方海量数据实时排序部分  方法:从根节点开始,判断左子树与右子树是否差1,然后对左右子树不断递归这个判断直到没有子树。

1.6.6 Java 生产者和消费者

  package third;  ​  import static java.lang.Thread.sleep;  ​  public class SellShaoBing {      private int shaoBingAmount=0;      private boolean isStop = false;      private int maxAmount;      public SellShaoBing(int max){          maxAmount = max;      }      private Thread createProductThread(){          return new Thread(()->{              while (!isStop) {                  synchronized (this) {                      //留个缓冲,做满之前就开始卖                      if (shaoBingAmount > maxAmount - 5) notifyAll();                      waitMethod(shaoBingAmount == maxAmount);                      sleepMethod();                      System.out.println("做烧饼");                      shaoBingAmount++;                      System.out.println("当前烧饼数量"+shaoBingAmount);                  }              }          });      }      private Thread createConsumerThread(){          return new Thread(()->{              while (!isStop) {                  synchronized (this) {                      //留个缓冲,烧饼卖完之间就通知生产                      if (shaoBingAmount < maxAmount /4) notifyAll();                      waitMethod(shaoBingAmount == 0);                      sleepMethod();                      System.out.println("卖烧饼");                      shaoBingAmount--;                      System.out.println("当前烧饼数量"+shaoBingAmount);                  }              }          });      }      public void actions(){         Thread product =  createProductThread();         Thread consumer =  createConsumerThread();         consumer.start();         product.start();          try {              sleep(5000);          } catch (InterruptedException e) {              e.printStackTrace();              Thread.currentThread().interrupt();          }          isStop = true;      }  ​      private synchronized void waitMethod(boolean iswait){          if (iswait) {              try {                  wait();              } catch (InterruptedException e) {                  e.printStackTrace();                  Thread.currentThread().interrupt();              }          }      }      private void sleepMethod(){          try {              sleep(100);          } catch (InterruptedException e) {              e.printStackTrace();              Thread.currentThread().interrupt();          }      }  }  ​

1.6.7 Scala 生产者和消费者(推荐)

  package utils  ​  //卖烧饼(Java生产者消费者的Scala实现)  case class ShaoBing(var amount:Int=0,maxAmount:Int=30,isContinue:Boolean=true)  object ShaoBing{      /**        * 烧饼线程模板       * @param isNotify 传入一个唤醒所有线程的判断方法       * @param isWait   传入一个使当前线程等待的方法       * @param doMethod 传入一个增减烧饼数量的方法       * @param shaoBing 传入一个烧饼对象       */      def createThread(isNotify:ShaoBing => Boolean , isWait:ShaoBing => Boolean , doMethod:ShaoBing => Unit)(shaoBing:ShaoBing)={          while (shaoBing.isContinue){              //notify和wait都需要在同步代码块中才能使用,java 的 synchronized(obj){...} = scala 的 obj.synchronized{...}              shaoBing.synchronized{              if (isNotify(shaoBing)) shaoBing.notifyAll()              //传入等待条件,使线程等待              if (isWait(shaoBing)) shaoBing.wait()              //增加或减少烧饼数量时才上锁                  doMethod(shaoBing)              }              println("当前烧饼数量:"+shaoBing.amount)              Thread.sleep(1000)          }      }  ​      //做烧饼      val createProducerThread = createThread(          //做满之前就开始卖          isNotify = shaoBing=>shaoBing.amount > shaoBing.maxAmount-5,          //做满之后就等          isWait = shaoBing=>shaoBing.amount == shaoBing.maxAmount,          _.amount += 1) _  ​      //卖烧饼      val createConsumerThread = createThread(          //卖到剩四分之一时候提前开始生产烧饼          isNotify = shaoBing=>shaoBing.amount < shaoBing.maxAmount/4,          //卖光了就停止卖烧饼          isWait = shaoBing=>shaoBing.amount == 0,          _.amount -= 1) _                  //启动生产者和消费者      def main(args:Array[String]):Unit={          val shaoBing = ShaoBing()          val producer = new Thread(new Runnable {override def run(): Unit = createProducerThread(shaoBing)})          val consumer = new Thread(new Runnable {override def run(): Unit = createConsumerThread(shaoBing)})          producer.start()          consumer.start()      }  }
1.7 线程模型

IO共有四种模型:同步阻塞、同步非阻塞、异步阻塞、异步非阻塞

同步阻塞:系统内核做好读写数据的准备之前,用户线程一直等待。

同步非阻塞:用户线程使用一个循环不断询问内核是否准备就绪,在准备就绪以前会一直进行该循环。

异步阻塞:用户线程启动一个新的线程去同步阻塞,自己则做下一件事,新的线程不断询问系统内核并阻塞。

异步非阻塞:当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可

标签: #spark考试题库