PAUSEPowerShell.exe -Command "python .\scrape_fear_idex.py"PAUSE
from selenium import webdriverfrom selenium.webdriver import Keysfrom selenium.webdriver.common.by import Byimport sqlite3import requestsfrom bs4 import BeautifulSoupimport timeimport datetimeimport osDRIVER_PATH = "chromedriver.exe"TARGET_URL = ";def save_data(time_index_list, table_name): # time_index_list: list of tuple, (timestamp:int, date_time:str, idx:int) # connect to the table conn = sqlite3.connect("FearAndGreedyIndex.db") c = conn.cursor() # get all tables in the FearAndGreedyIndex.db c.execute("""SELECT name FROM sqlite_master WHERE type='table';""") table_list = c.fetchall() # if table doesn't exist, create the table if table_name not in [i[0] for i in table_list]: c.execute(f"CREATE TABLE {table_name} (time_stamp INTEGER, date_time TEXT, idx_data INTEGER);") # c.execute("CREATE TABLE index_data (time_stamp INTEGER, date_time TEXT, idx_data INTEGER);") # c.execute("CREATE TABLE friends (first_name TEXT, last_name TEXT, closeness INTEGER);") conn.commit() # conn.close() print('database and table created...') else: print('database and table already created...') c.executemany(f"INSERT INTO {table_name} VALUES (?,?,?);", time_index_list) conn.commit() conn.close() print('data saved...') print('--------->')# def close_db():# conn = sqlite3.connect("FearAndGreedyIndex.db")# conn.close()def get_time_index_list(hours, table_name): # hours (int): input the hours duration to run # table_name (str): input the database table to save to driver = webdriver.Chrome(executable_path=DRIVER_PATH) driver.maximize_window() driver.get(TARGET_URL) time.sleep(5) # wait webpage loading print('web drive launched...') time.sleep(1) print('--------->') minutes = hours * 60 time_index_list_tmp = [] time_index_list = [] time.sleep(5) for i in range(minutes): try: # get the timestamp from the webpage time_em = driver.find_element(By.CLASS_NAME, 'market-fng-gauge__timestamp') timestamp = time_em.get_attribute("data-timestamp") if len(timestamp) == 0: timestamp = 0 # get the index value from the webpage index = driver.find_element(By.CLASS_NAME, 'market-fng-gauge__dial-number-value') if len(index.text) == 0: index.text = 0 except: print("An exception occurred, skip to next run in 60s.") driver.refresh() time.sleep(60) continue # get the current datetime from system current_date_time = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S") # combine the data as tuple and append to list time_index = (int(timestamp), current_date_time, int(index.text)) time_index_list_tmp.append(time_index) # save the index data every 10 minutes if (i % 10 == 0) and (i > 0): table_name_tmp = table_name + '_' + datetime.datetime.now().strftime("%d_%m_%Y") save_data(time_index_list_tmp, table_name_tmp) save_data(time_index_list_tmp, table_name) time_index_list_tmp = [] # empty the list to avoid duplicate data print(time_index) # print current index for log time_index_list.append(time_index) time.sleep(60) # wait every 60 sec # for loop end and scrape completed print('Scrape Completed') # print(time_index_list) # save_data(time_index_list, table_name) # quit the scrape and web drive time.sleep(2) driver.close() time.sleep(5) driver.quit() print('web drive terminated')# start, run only once to creat the database:# creat_db("FearAndGreedyIndex.db")# Call the scrape function to runn# Input: hours, table name to saveget_time_index_list(8, "index_data")
标签: #python中tmp