龙空技术网

SparkSQL中读取数据和写出数据

爱做梦的程序员 160

前言:

当前兄弟们对“sql语句中if的用法”大概比较关切,你们都需要分析一些“sql语句中if的用法”的相关内容。那么小编同时在网上收集了一些对于“sql语句中if的用法””的相关知识,希望大家能喜欢,小伙伴们一起来学习一下吧!

1.1 Spark 读取数据的统一入口:spark.read.format(指定的格式).load(文件的路径)或者spark.read.格式的名称(文件的路径)1.2 Spark 写出数据的统一出口:DataFrame.write.format(保存为什么格式).save(保存到哪个路径)或者DataFrame.write.保存的格式(保存到哪个路径)1.3 Spark 写出数据有 4 种方式:append: 如果数据源或者表已经存在,继续 追加overwrite: 如果数据源已经存在, 覆盖 写出ignore: 如果数据源已经存在,将忽略(ignore) DataFrame中的数据,如果不存在,则创建并写出。官网的比喻是类似这条SQL语句:create table if not existserrorifexists: 如果数据源(可以理解为文件)已经存在,将抛出异常2.1 SparkSQL读取数据的综合案例:

from pyspark.sql import SparkSessionif __name__ == '__main__':    # 创建上下文对象    spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate()        # 1.1-读取json文件并打印    print('json:')    df1=spark.read.format('json').load(';)    df1.printSchema()    df1.show()    # 1.2-写法二    df2=spark.read.json(';)    df2.printSchema()    df2.show()    # 2-读取csv文件并打印    print('csv:')    df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv(';)    df3.printSchema()    df3.show()    # 3-读取parquet文件并打印    print('parquet:')    # 下面3种写法效果一样    df4=spark.read.format('parquet').load(';)    df4.printSchema()    df4.show()    df5=spark.read.load(';)    df5.printSchema()    df5.show()    df6=spark.read.parquet(';)    df6.printSchema()    df6.show()    # 4-读取jdbc-mysql文件并打印    print('jdbc:')    df7=spark.read.format('jdbc')\        .option('url','jdbc:mysql://node1:3306')\        .option('user','root')\        .option('password','123456')\        .option('dbtable','bigdata.people')\        .load()    df7.printSchema()    df7.show()
2.2 SparkSQL写出数据的综合案例:
from pyspark.sql import SparkSessionif __name__ == '__main__':    # 创建上下文对象    spark=SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions','6').getOrCreate()    sc=spark.sparkContext    # 使用SparkContext,读取txt形成RDD,转换成DataFrame    rdd1=sc.textFile(';)    rdd2=rdd1.map(lambda str:(str.split(',')[0],int(str.split(',')[1].strip())))    df=rdd2.toDF(['name','age'])    # 并打印schema和数据    df.printSchema()    df.show()    # 1.1-将DataFrame保存成为json文件,    df.coalesce(1).write.format('json').mode('overwrite').save(';)    # 1.2-写法二    df.coalesce(1).write.mode('overwrite').json(';)    # 2-将DataFrame保存成为csv文件    df.coalesce(1).write.mode('overwrite')\        .option('header',True)\        .option('sep',';')\        .csv(';)    # 3-将DataFrame保存成为parquet文件(SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据)    df.coalesce(1).write.mode('overwrite').save(';)    # 4-将DataFrame保存成为jdbc-mysql数据    df.write.format('jdbc').mode('overwrite') \        .option('url', 'jdbc:mysql://node1:3306') \        .option('user', 'root') \        .option('password', '123456') \        .option('dbtable', 'bigdata.people') \        .save()

标签: #sql语句中if的用法