Home 用 Python 自動抓取期交所選擇權數據
Post
Cancel

用 Python 自動抓取期交所選擇權數據

大家好久不見! 今天跟各位分享如何使用 Python 自動化抓取 期交所選擇權成交數據 (年度行情),並整理數據。

pic1

使用 Python 抓取數據

首先載入基本套件:

1
2
3
4
5
6
7
8
9
10
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import requests
from bs4 import BeautifulSoup
import datetime as dt
from pandas.tseries.offsets import BDay

todayStr = dt.datetime.today().strftime('%Y-%m-%d')
thisYear = int(todayStr[:4]) # 取得目前年份

接著,需要讓 Python 自動填入契約年,並按下下載鍵:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
import time


for year in range(2001, thisYear):
    '''
    爬取台灣期貨交易所網頁,抓取年度台指選數據 (起始年份設為2001~目前日期的前一年)
    '''
    driver = webdriver.Edge()
    driver.get(f'https://www.taifex.com.tw/cht/3/optDailyMarketView')
    driver.maximize_window()

    grbf = Select(driver.find_element(By.CSS_SELECTOR, f"select[name='his_year")) # 找尋輸入年度行情的按鈕
    grbf.select_by_value(str(year))

    downloadButton = driver.find_element(By.CSS_SELECTOR, f"input[name='button9") # 下載鍵

    action = webdriver.common.action_chains.ActionChains(driver)
    action.move_to_element(downloadButton)
    action.click()
    action.perform()
    time.sleep(0.5)

檔案整理

使用以上程式碼自動下載數據後,發現是壓縮檔,進一步將資料解壓縮:

1
2
3
4
5
import zipfile

for i in range(2001, thisYear):
    with zipfile.ZipFile(rf'C:\Users\Blair Lin\quantTWStock\data\opt_data\{i}_opt.zip', 'r') as zip_ref:
        zip_ref.extractall(r'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData')

數據整理

期交所提供的資料有些需要特別處理,其中:

  • 2001年數據只有幾天故不納入分析
  • 2015年數據分成四季
  • 2016年開始分月份,
  • 2017年開始每月資料分為兩分 (上下月,推測是因為夜盤導致資料變多)

處理方式如下,這裡注意到由於期交所提供的csv檔column名稱含有中文,在使用.read_csv()時記得使用可解析中文的Big5編碼:

  • 2002~2014 的數據
1
2
3
4
5
6
7
8
9
df = pd.DataFrame()

# phase 1: 2002~2014
phase1Df = pd.DataFrame()
for year in range(2002, 2015):
    tmp = pd.read_csv(rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt.csv', encoding='Big5')
    tmp = tmp[tmp['契約'] == 'TXO']
    phase1Df = pd.concat([phase1Df, tmp])

  • 2015~2017 的數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# phase 2: 2015~2017
phase2Df = pd.DataFrame()

for year in range(2015, 2018):
    if year == 2015:
        for m in range(1, 5):
            tmp = pd.read_csv(rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_{m}_opt.csv', encoding='Big5')
            tmp = tmp[ (tmp['契約'] == 'TXO') & np.isnan(tmp['是否因訊息面暫停交易']) == True].drop(columns=['是否因訊息面暫停交易'])
            phase2Df = pd.concat([phase2Df, tmp])
        # print(phase2Df)

    if year == 2016:
        for m in range(1, 13):
            tmp = pd.read_csv(rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt_{m}.csv', encoding='Big5')
            tmp = tmp[ (tmp['契約'] == 'TXO') & np.isnan(tmp['是否因訊息面暫停交易']) == True].drop(columns=['是否因訊息面暫停交易'])
            phase2Df = pd.concat([phase2Df, tmp])
    
    if year == 2017:
        for m in range(1, 13):
            if m <= 5:
                tmp = pd.read_csv(rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt_{m}.csv', encoding='Big5')
                if m == 4:
                    tmp = tmp[tmp['是否因訊息面暫停交易'] == '*'].replace('*', np.nan)
                    tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True)].drop(columns=['是否因訊息面暫停交易'])
                    phase2Df = pd.concat([phase2Df, tmp])
                else:
                    tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True)].drop(columns=['是否因訊息面暫停交易'])
                    phase2Df = pd.concat([phase2Df, tmp])
            elif m >= 6: # 2017.06 加入夜盤
                for i in range(1, 3):
                    try:
                        tmp = pd.read_csv(rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt_{m}_{i}.csv', encoding='Big5')
                        tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True) & (tmp['交易時段'] == '一般')].drop(columns=['是否因訊息面暫停交易', '交易時段'])
                        phase2Df = pd.concat([phase2Df, tmp])
                    except Exception as e:
                        tmp = tmp[tmp['是否因訊息面暫停交易'] == '*'].replace('*', np.nan)
                        tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True) & (tmp['交易時段'] == '一般')].drop(columns=['是否因訊息面暫停交易', '交易時段'])
                        phase2Df = pd.concat([phase2Df, tmp])

  • 2018~2023 的數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# phase 3: 2018~2023
phase3Df = pd.DataFrame()

for year in range(2018, thisYear):
    for m in range(1, 13):
        # print(f'{year}{m}')
        try:
            file = rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt_0{m}.csv' if m <= 9 else rf'C:\Users\Blair Lin\quantTWStock\data\TXO_yearlyData\{year}_opt_{m}.csv'
            tmp = pd.read_csv(file, encoding='Big5', index_col=False)
            tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True) & (tmp['交易時段'] == '一般')].drop(columns=['是否因訊息面暫停交易', '交易時段'])
            phase3Df = pd.concat([phase3Df, tmp])
        except Exception as e:
            tmp = tmp[tmp['是否因訊息面暫停交易'] == '*'].replace('*', np.nan)
            tmp = tmp[ (tmp['契約'] == 'TXO') & (np.isnan(tmp['是否因訊息面暫停交易']) == True) & (tmp['交易時段'] == '一般')].drop(columns=['是否因訊息面暫停交易', '交易時段'])
            phase3Df = pd.concat([phase3Df, tmp])

  • 將三個階段數據彙整:
1
2
3
finalDf = pd.concat([phase1Df, phase2Df, phase3Df]).drop(columns=['交易時段', '漲跌價', '漲跌%'])
finalDf['交易日期'] = pd.to_datetime(finalDf['交易日期'], format = '%Y/%m/%d')
finalDf.to_csv('2002_2023_TXO.csv')

本篇文章就到這裡,感謝觀看!!!

This post is licensed under CC BY 4.0 by the author.