前言:
而今看官们对“python读取mdb”大致比较珍视,我们都需要分析一些“python读取mdb”的相关资讯。那么小编同时在网上网罗了一些关于“python读取mdb””的相关知识,希望各位老铁们能喜欢,各位老铁们一起来学习一下吧!本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。
MySQL连接数据库
Python可以使用mysql-connector-python库连接MySQL数据库:
import mysql.connectorconn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')print("Opened MySQL database successfully")conn.close()CRUD操作
接下来,我们将展示在MySQL中如何进行基本的CRUD操作。
创建(Create)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")print("Table created successfully")conn.close()读取(Retrieve)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()SQL Server连接数据库
Python可以使用pyodbc库连接SQL Server数据库:
import pyodbcconn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')print("Opened SQL Server database successfully")conn.close()CRUD操作
接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。
创建(Create)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")conn.commit()print("Table created successfully")conn.close()读取(Retrieve)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()Oracle连接数据库
Python可以使用cx_Oracle库连接Oracle数据库:
import cx_Oracledsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)print("Opened Oracle database successfully")conn.close()CRUD操作
接下来,我们将展示在Oracle中如何进行基本的CRUD操作。
创建(Create)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")conn.commit()print("Table created successfully")conn.close()读取(Retrieve)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()PostgreSQL连接数据库
Python可以使用psycopg2库连接PostgreSQL数据库:
import psycopg2conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")print("Opened PostgreSQL database successfully")conn.close()CRUD操作
接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。
创建(Create)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''')conn.commit()print("Table created successfully")conn.close()读取(Retrieve)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()MongoDB连接数据库
Python可以使用pymongo库连接MongoDB数据库:
from pymongo import MongoClientclient = MongoClient("mongodb://localhost:27017/")db = client["my_database"]print("Opened MongoDB database successfully")client.close()CRUD操作
接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。
创建(Create)
在MongoDB中,文档的创建操作通常包含在插入操作中:
client = MongoClient("mongodb://localhost:27017/")db = client["my_database"]employees = db["Employees"]employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}employees.insert_one(employee)print("Document inserted successfully")client.close()读取(Retrieve)
client = MongoClient("mongodb://localhost:27017/")db = client["my_database"]employees = db["Employees"]cursor = employees.find()for document in cursor: print(document)client.close()更新(Update)
client = MongoClient("mongodb://localhost:27017/")db = client["my_database"]employees = db["Employees"]query = { "id": "1" }new_values = { "$set": { "salary": "25000.00" } }employees.update_one(query, new_values)print("Document updated successfully")client.close()删除(Delete)
client = MongoClient("mongodb://localhost:27017/")db = client["my_database"]employees = db["Employees"]query = { "id": "1" }employees.delete_one(query)print("Document deleted successfully")client.close()SQLite连接数据库
Python使用sqlite3库连接SQLite数据库:
import sqlite3conn = sqlite3.connect('my_database.db')print("Opened SQLite database successfully")conn.close()CRUD操作
接下来,我们将展示在SQLite中如何进行基本的CRUD操作。
创建(Create)
conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''')conn.commit()print("Table created successfully")conn.close()读取(Retrieve)
conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()DB2连接数据库
Python可以使用ibm_db库连接DB2数据库:
import ibm_dbdsn = ( "DRIVER={{IBM DB2 ODBC DRIVER}};" "DATABASE=my_database;" "HOSTNAME=127.0.0.1;" "PORT=50000;" "PROTOCOL=TCPIP;" "UID=username;" "PWD=password;")conn = ibm_db.connect(dsn, "", "")print("Opened DB2 database successfully")ibm_db.close(conn)CRUD操作
接下来,我们将展示在DB2中如何进行基本的CRUD操作。
创建(Create)
conn = ibm_db.connect(dsn, "", "")sql = '''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY DECIMAL(9, 2));'''stmt = ibm_db.exec_immediate(conn, sql)print("Table created successfully")ibm_db.close(conn)读取(Retrieve)
conn = ibm_db.connect(dsn, "", "")sql = "SELECT id, name, address, salary from Employees"stmt = ibm_db.exec_immediate(conn, sql)while ibm_db.fetch_row(stmt): print("ID = ", ibm_db.result(stmt, "ID")) print("NAME = ", ibm_db.result(stmt, "NAME")) print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS")) print("SALARY = ", ibm_db.result(stmt, "SALARY"))ibm_db.close(conn)更新(Update)
conn = ibm_db.connect(dsn, "", "")sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"stmt = ibm_db.exec_immediate(conn, sql)ibm_db.commit(conn)print("Total number of rows updated :", ibm_db.num_rows(stmt))ibm_db.close(conn)删除(Delete)
conn = ibm_db.connect(dsn, "", "")sql = "DELETE from Employees where ID = 1"stmt = ibm_db.exec_immediate(conn, sql)ibm_db.commit(conn)print("Total number of rows deleted :", ibm_db.num_rows(stmt))ibm_db.close(conn)Microsoft Access连接数据库
Python可以使用pyodbc库连接Microsoft Access数据库:
import pyodbcconn_str = ( r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' r'DBQ=path_to_your_access_file.accdb;')conn = pyodbc.connect(conn_str)print("Opened Access database successfully")conn.close()CRUD操作
接下来,我们将展示在Access中如何进行基本的CRUD操作。
创建(Create)
conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY DECIMAL(9, 2));''')conn.commit()print("Table created successfully")conn.close()读取(Retrieve)
conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute("SELECT id, name, address, salary from Employees")rows = cursor.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3])conn.close()更新(Update)
conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")conn.commit()print("Total number of rows updated :", cursor.rowcount)conn.close()删除(Delete)
conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute("DELETE from Employees where ID = 1")conn.commit()print("Total number of rows deleted :", cursor.rowcount)conn.close()Cassandra连接数据库
Python可以使用cassandra-driver库连接Cassandra数据库:
from cassandra.cluster import Clustercluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')print("Opened Cassandra database successfully")cluster.shutdown()CRUD操作
接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。
创建(Create)
cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')session.execute(""" CREATE TABLE Employees ( id int PRIMARY KEY, name text, age int, address text, salary decimal )""")print("Table created successfully")cluster.shutdown()读取(Retrieve)
cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')rows = session.execute('SELECT id, name, address, salary FROM Employees')for row in rows: print("ID = ", row.id) print("NAME = ", row.name) print("ADDRESS = ", row.address) print("SALARY = ", row.salary)cluster.shutdown()更新(Update)
cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")print("Row updated successfully")cluster.shutdown()删除(Delete)
cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')session.execute("DELETE FROM Employees WHERE id = 1")print("Row deleted successfully")cluster.shutdown()Redis连接数据库
Python可以使用redis-py库连接Redis数据库:
import redisr = redis.Redis(host='localhost', port=6379, db=0)print("Opened Redis database successfully")CRUD操作
接下来,我们将展示在Redis中如何进行基本的CRUD操作。
创建(Create)
r = redis.Redis(host='localhost', port=6379, db=0)r.set('employee:1:name', 'John')r.set('employee:1:age', '30')r.set('employee:1:address', 'New York')r.set('employee:1:salary', '1000.00')print("Keys created successfully")读取(Retrieve)
r = redis.Redis(host='localhost', port=6379, db=0)print("NAME = ", r.get('employee:1:name').decode('utf-8'))print("AGE = ", r.get('employee:1:age').decode('utf-8'))print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))更新(Update)
r = redis.Redis(host='localhost', port=6379, db=0)r.set('employee:1:salary', '25000.00')print("Key updated successfully")删除(Delete)
r = redis.Redis(host='localhost', port=6379, db=0)r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')print("Keys deleted successfully")ElasticSearch连接数据库
Python可以使用elasticsearch库连接ElasticSearch数据库:
from elasticsearch import Elasticsearches = Elasticsearch([{'host': 'localhost', 'port': 9200}])print("Opened ElasticSearch database successfully")CRUD操作
接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。
创建(Create)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])employee = { 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00}res = es.index(index='employees', doc_type='employee', id=1, body=employee)print("Document created successfully")读取(Retrieve)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])res = es.get(index='employees', doc_type='employee', id=1)print("Document details:")for field, details in res['_source'].items(): print(f"{field.upper()} = ", details)更新(Update)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])res = es.update(index='employees', doc_type='employee', id=1, body={ 'doc': { 'salary': 25000.00 }})print("Document updated successfully")删除(Delete)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])res = es.delete(index='employees', doc_type='employee', id=1)print("Document deleted successfully")Neo4j连接数据库
Python可以使用neo4j库连接Neo4j数据库:
from neo4j import GraphDatabasedriver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))print("Opened Neo4j database successfully")driver.close()CRUD操作
接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。
创建(Create)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))with driver.session() as session: session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")print("Node created successfully")driver.close()读取(Retrieve)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))with driver.session() as session: result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n") for record in result: print("ID = ", record["n"]["id"]) print("NAME = ", record["n"]["name"]) print("ADDRESS = ", record["n"]["address"]) print("SALARY = ", record["n"]["salary"])driver.close()更新(Update)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))with driver.session() as session: session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")print("Node updated successfully")driver.close()删除(Delete)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))with driver.session() as session: session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")print("Node deleted successfully")driver.close()InfluxDB连接数据库
Python可以使用InfluxDB-Python库连接InfluxDB数据库:
from influxdb import InfluxDBClientclient = InfluxDBClient(host='localhost', port=8086)print("Opened InfluxDB database successfully")client.close()CRUD操作
接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。
创建(Create)
client = InfluxDBClient(host='localhost', port=8086)json_body = [ { "measurement": "employees", "tags": { "id": "1" }, "fields": { "name": "John", "age": 30, "address": "New York", "salary": 1000.00 } }]client.write_points(json_body)print("Point created successfully")client.close()读取(Retrieve)
client = InfluxDBClient(host='localhost', port=8086)result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')for point in result.get_points(): print("ID = ", point['id']) print("NAME = ", point['name']) print("AGE = ", point['age']) print("ADDRESS = ", point['address']) print("SALARY = ", point['salary'])client.close()更新(Update)
InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。
删除(Delete)
同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。
client = InfluxDBClient(host='localhost', port=8086)# 删除整个系列client.query('DROP SERIES FROM "employees"')# 删除某个时间段的数据# client.query('DELETE FROM "employees" WHERE time < now() - 1d')print("Series deleted successfully")client.close()Snowflake连接数据库
Python可以使用snowflake-connector-python库连接Snowflake数据库:
from snowflake.connector import connectcon = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')print("Opened Snowflake database successfully")con.close()CRUD操作
接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。
创建(Create)
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute("""CREATE TABLE EMPLOYEES ( ID INT, NAME STRING, AGE INT, ADDRESS STRING, SALARY FLOAT)""")cur.execute("""INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES(1, 'John', 30, 'New York', 1000.00)""")print("Table created and row inserted successfully")con.close()读取(Retrieve)
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")rows = cur.fetchall()for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("AGE = ", row[2]) print("ADDRESS = ", row[3]) print("SALARY = ", row[4])con.close()更新(Update)
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")print("Row updated successfully")con.close()删除(Delete)
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")print("Row deleted successfully")con.close()Amazon DynamoDB连接数据库
Python可以使用boto3库连接Amazon DynamoDB:
import boto3dynamodb = boto3.resource('dynamodb', region_name='us-west-2', aws_access_key_id='Your AWS Access Key', aws_secret_access_key='Your AWS Secret Key')print("Opened DynamoDB successfully")CRUD操作
接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。
创建(Create)
table = dynamodb.create_table( TableName='Employees', KeySchema=[ { 'AttributeName': 'id', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'id', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 })table.put_item( Item={ 'id': 1, 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 })print("Table created and item inserted successfully")读取(Retrieve)
table = dynamodb.Table('Employees')response = table.get_item( Key={ 'id': 1, })item = response['Item']print(item)更新(Update)
table = dynamodb.Table('Employees')table.update_item( Key={ 'id': 1, }, UpdateExpression='SET salary = :val1', ExpressionAttributeValues={ ':val1': 25000.00 })print("Item updated successfully")删除(Delete)
table = dynamodb.Table('Employees')table.delete_item( Key={ 'id': 1, })print("Item deleted successfully")Microsoft Azure CosMos DB连接数据库
Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:
from azure.cosmos import CosmosClient, PartitionKey, exceptionsurl = 'Cosmos DB Account URL'key = 'Cosmos DB Account Key'client = CosmosClient(url, credential=key)database_name = 'testDB'database = client.get_database_client(database_name)container_name = 'Employees'container = database.get_container_client(container_name)print("Opened CosMos DB successfully")CRUD操作
接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。
创建(Create)
database = client.create_database_if_not_exists(id=database_name)container = database.create_container_if_not_exists( id=container_name, partition_key=PartitionKey(path="/id"), offer_throughput=400)container.upsert_item({ 'id': '1', 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00})print("Container created and item upserted successfully")读取(Retrieve)
for item in container.read_all_items(): print(item)更新(Update)
for item in container.read_all_items(): if item['id'] == '1': item['salary'] = 25000.00 container.upsert_item(item)print("Item updated successfully")删除(Delete)
for item in container.read_all_items(): if item['id'] == '1': container.delete_item(item, partition_key='1')print("Item deleted successfully")
标签: #python读取mdb