龙空技术网

kudu详细架构与入门使用

大数据技术实战 769

前言:

目前你们对“apachekudu代替hdfs”可能比较珍视,看官们都想要学习一些“apachekudu代替hdfs”的相关文章。那么小编同时在网上搜集了一些对于“apachekudu代替hdfs””的相关文章,希望各位老铁们能喜欢,小伙伴们快快来学习一下吧!

1、 kudu简介1.1、kudu是什么?

简单来说:dudu是一个与hbase类似的列式存储分布式数据库。

官方给kudu的定位是:在更新更及时的基础上实现更快的数据分析

1.2、为什么需要kudu?1.2.1、hdfs与hbase数据存储的缺点

目前数据存储有了HDFS与hbase,为什么还要额外的弄一个kudu呢?

HDFS:使用列式存储格式Apache Parquet,Apache ORC,适合离线分析,不支持单条纪录级别的update操作,随机读写性能差

HBASE:可以进行高效随机读写,却并不适用于基于SQL的数据分析方向,大批量数据获取时的性能较差。

正因为HDFS与HBASE有上面这些缺点,KUDU较好的解决了HDFS与HBASE的这些缺点,它不及HDFS批处理快,也不及HBase随机读写能力强,但是反过来它比HBase批处理快(适用于OLAP的分析场景),而且比HDFS随机读写能力强(适用于实时写入或者更新的场景),这就是它能解决的问题。

架构介绍2.1、基本架构2.1.1、概念

 Table(表):一张table是数据存储在kudu的位置。Table具有schema和全局有序的primary key(主键)。Table被分为很多段,也就是tablets.

 Tablet (段):一个tablet是一张table连续的segment,与其他数据存储引擎或关系型数据的partition相似。Tablet存在副本机制,其中一个副本为leader tablet。任何副本都可以对读取进行服务,并且写入时需要在所有副本对应的tablet server之间达成一致性。

 Tablet server:存储tablet和为tablet向client提供服务。对于给定的tablet,一个tablet server充当leader,其他tablet server充当该tablet的follower副本。只有leader服务写请求,leader与follower为每个服务提供读请求。

 Master:主要用来管理元数据(元数据存储在只有一个tablet的catalog table中),即tablet与表的基本信息,监听tserver的状态

 Catalog Table: 元数据表,用来存储table(schema、locations、states)与tablet(现有的tablet列表,每个tablet及其副本所处tserver,tablet当前状态以及开始和结束键)的信息。

3、存储机制3.1 存储结构全景图3.2、存储结构解析

 一个Table包含多个Tablet,其中Tablet的数量是根据hash或者range进行设置

 一个Tablet中包含MetaData信息和多个RowSet信息

 一个Rowset中包含一个MemRowSet与0个或多个DiskRowset,其中MemRowSet存储insert的数据,一旦MemRowSet写满会flush到磁盘生成一个或多个DiskRowSet,此时MemRowSet清空。MemRowSet默认写满1G或者120s flush一次

(注意:memRowSet是行式存储,DiskRowSet是列式存储,MemRowSet基于primary key有序)。每隔tablet中会定期对一些diskrowset做compaction操作,目的是对多个diskRowSet进行重新排序,以此来使其更有序并减少diskRowSet的数量,同时在compaction的过程中慧慧resolve掉deltaStores当中的delete记录

 一个DiskRowSet包含baseData与DeltaStores两部分,其中baseData存储的数据看起来不可改变,DeltaStores中存储的是改变的数据

 DeltaStores包含一个DeltaMemStores和多个DeltaFile,其中DeltaMemStores放在内存,用来存储update与delete数据,一旦DeltaMemStores写满,会flush成DeltaFile。

当DeltaFile过多会影响查询性能,所以KUDU每隔一段时间会执行compaction操作,将其合并到baseData中,主要是resolve掉update数据。

4、kudu的工作机制

4.1、概述

1、kudu主要角色分为master与tserver

2、master主要负责:管理元数据信息,监听server,当server宕机后负责tablet的重分配

3、tserver主要负责tablet的存储与和数据的增删改查

4.2 内部实现原理图4.2、读流程4.2.1、概述

客户端将要读取的数据信息发送给master,master对其进行一定的校验,比如表是否存在,字段是否存在。Master返回元数据信息给client,然后client与tserver建立连接,通过metaData找到数据所在的RowSet,首先加载内存里面的数据(MemRowSet与DeltMemStore),然后加载磁盘里面的数据,最后返回最终数据给client.

4.2.2、详细步骤图

4.2.3、详细步骤解析

1、客户端master请求查询表指定数据

2、master对请求进行校验,校验表是否存在,schema中是否存在指定查询的字段,主键是否存在

3、master通过查询catalog Table返回表,将tablet对应的tserver信息、tserver状态等元数据信息返回给client

4、client与tserver建立连接,通过metaData找到primary key对应的RowSet。

5、首先加载RowSet内存中MemRowSet与DeltMemStore中的数据

6、然后加载磁盘中的数据,也就是DiskRowSet中的BaseData与DeltFile中的数据

7、返回数据给Client

8、继续4-7步骤,直到拿到所有数据返回给client

4.3、Insert流程4.3.1、概述

Client首先连接master,获取元数据信息。然后连接tserver,查找MemRowSet与DeltMemStore中是否存在相同primary key,如果存在,则报错;如果不存在,则将待插入的数据写入WAL日志,然后将数据写入MemRowSet。

4.3.2、详细步骤图4.3.3、详细步骤解析

1、client向master请求预写表的元数据信息

2、master会进行一定的校验,表是否存在,字段是否存在等

3、如果master校验通过,则返回表的分区、tablet与其对应的tserver给client;如果校验失败则报错给client。

4、client根据master返回的元数据信息,将请求发送给tablet对应的tserver.

5、tserver首先会查询内存中MemRowSet与DeltMemStore中是否存在与待插入数据主键相同的数据,如果存在则报错

6、tserver会讲写请求预写到WAL日志,用来server宕机后的恢复操作

7、将数据写入内存中的MemRowSet中,一旦MemRowSet的大小达到1G或120s后,MemRowSet会flush成一个或DiskRowSet,用来将数据持久化

8、返回client数据处理完毕

4.4、数据更新流程4.4.1、概述

Client首先向master请求元数据,然后根据元数据提供的tablet信息,连接tserver,根据数据所处位置的不同,有不同的操作:在内存MemRowSet中的数据,会将更新信息写入数据所在行的mutation链表中;在磁盘中的数据,会将更新信息写入DeltMemStore中。

4.4.2、详细步骤图4.4.3、详细步骤解析

1、client向master请求预更新表的元数据,首先master会校验表是否存在,字段是否存在,如果校验通过则会返回给client表的分区、tablet、tablet所在tserver信息

2、client向tserver发起更新请求

3、将更新操作预写入WAL日志,用来在server宕机后的数据恢复

4、根据tserver中待更新的数据所处位置的不同,有不同的处理方式:

如果数据在内存中,则从MemRowSet中找到数据所处的行,然后在改行的mutation链表中写入更新信息,在MemRowSet flush的时候,将更新合并到baseData中

如果数据在DiskRowSet中,则将更新信息写入DeltMemStore中,DeltMemStore达到一定大小后会flush成DeltFile。

5、更新完毕后返回消息给client。

5、KUDU的java操作5.1、导入依赖

<dependency>    <groupId>org.apache.kudu</groupId>    <artifactId>kudu-client</artifactId>    <version>${kudu.version}</version>    <scope>test</scope></dependency><!--导入kudu的客户端工具--><dependency>    <groupId>org.apache.kudu</groupId>    <artifactId>kudu-client-tools</artifactId>    <version>${kudu.version}</version></dependency>


5.2、API5.2.1、表创建

 /**     * 创建表     * @throws Exception     */    public static void createTable() throws Exception{        //1、创建一个client        KuduClient client = new KuduClientBuilder(KUDU_MASTER).build();        //2、创建schema信息        List<ColumnSchema> columns = new ArrayList<ColumnSchema>();        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).nullable(false).build());        columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(false).nullable(false).build());        columns.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).key(false).nullable(false).build());        Schema schema = new Schema(columns);        //3、指定分区字段        List<String> partions = new ArrayList<String>();        partions.add("id");        //4、指定分区方式为hash分区、6个分区,一个副本        CreateTableOptions options = new CreateTableOptions().addHashPartitions(partions, 6).setNumReplicas(1);        //5、创建表,        client.createTable("person",schema,options);        client.close();    }


5.2.2、Insert

 /**     * 插入数据     * @throws Exception     */    public static void add() throws Exception{        //1、创建一个client        KuduClient client = new KuduClientBuilder(KUDU_MASTER).build();        //2、打开表        KuduTable table = client.openTable("person");        //3、创建一个session会话        KuduSession session = client.newSession();        //4、创建插入        Insert insert = table.newInsert();        //5、指定插入数据        insert.getRow().addInt("id",1);        insert.getRow().addInt("age",18);        insert.getRow().addString("name","张三");        //6、应用插入        session.apply(insert);        session.close();        client.close();    }


5.2.3、update

  /**     * 更新数据     * @throws Exception     */    public static void update() throws Exception{        //1、创建kudu client        KuduClient client = new KuduClientBuilder(KUDU_MASTER).build();        //2、打开表        KuduTable table = client.openTable("person");        KuduSession session = client.newSession();        Update update = table.newUpdate();        update.getRow().addInt("id",1);        update.getRow().addString("name","李四");        session.apply(update);        session.flush();        session.close();        client.close();    }


5.2.4、delete

 /**     * 删除数据     * @throws Exception     */    public static void delete() throws Exception{        //1、创建kudu client        KuduClient client = new KuduClientBuilder(KUDU_MASTER).build();        //2、打开表        KuduTable table = client.openTable("person");        KuduSession session = client.newSession();        Delete delete = table.newDelete();        delete.getRow().addInt("id",1);        session.apply(delete);        session.flush();        session.close();        client.close();    }


5.2.5、query


 /**     * 条件查询 select * from person where id=1     * @throws Exception     */    public static void query() throws Exception{        //1、创建kudu client        KuduClient client = new KuduClientBuilder(KUDU_MASTER).build();        //2、打开表        KuduTable table = client.openTable("person");        //3、创建scanner扫描器        KuduScanner.KuduScannerBuilder kuduScannerBuilder = client.newScannerBuilder(table);        //4、创建查询条件        KuduPredicate filter = KuduPredicate.newComparisonPredicate(table.getSchema().getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, 1);        //5、将查询条件加入到scanner中        KuduScanner scanner = kuduScannerBuilder.addPredicate(filter).build();        //6、获取查询结果        while (scanner.hasMoreRows()){            RowResultIterator rows = scanner.nextRows();            while (rows.hasNext()){                RowResult row = rows.next();                Integer id = row.getInt("id");                String name = row.getString("name");                int age = row.getInt("age");                System.out.println(id+"---"+name+"---"+age);            }        }        //7、关闭client        client.close();    }                                                    

标签: #apachekudu代替hdfs