期货日数据维护与使用_日数据维护_日数据更新

2024-01-07 17:55:15

目录

写在前面:

下载日数据

下载“新增合约”日数据

下载“待更新合约”日数据

日数据文件

“选择日数据所在目录”按钮点击?

?“执行”按钮点击

sqlite3代码

按钮点击后执行的代码

子线程代码


写在前面:

本文默认已经创建了项目,如果不知道如何创建一个空项目的,请参看以下两篇博文

PyQt5将项目搬到一个新的虚拟环境中
https://blog.csdn.net/m0_37967652/article/details/122625280
python_PyQt5开发工具结构基础
https://blog.csdn.net/m0_37967652/article/details/131969032

前序:

【期货日数据维护与使用_日数据维护_sqlite3数据库创建】博文

【期货日数据维护与使用_日数据维护_界面代码】博文

【期货日数据维护与使用_日数据维护_合约更新】博文

下载日数据

下载“新增合约”日数据

“新增合约”列表下载后在 new.json 中

在优矿中执行的代码?

ticker_list = ["fu2501", "lu2501", "sc2501"]
df = DataAPI.MktFutdGet(secID=u"",ticker=ticker_list,tradeDate=u"",beginDate=u"",endDate=u"",exchangeCD="",field=u"",pandas="1")
df.to_csv('daily_20240106_00.csv',encoding='utf-8')

下载“待更新合约”日数据

“待更新合约”列表下载后在 {日期}.json,可能会有多个{日期}.json

在优矿中执行的代码

ticker_list = ["AP401", "AP403", "AP404", "AP405", ...]
df = DataAPI.MktFutdGet(secID=u"",ticker=ticker_list,tradeDate=u"",beginDate=u"20240103",endDate=u"",exchangeCD="",field=u"",pandas="1")
df.to_csv('daily_20240106_11.csv',encoding='utf-8')

日数据文件

?日数据字段列表:

['secID','ticker','exchangeCD','secShortName','tradeDate','contractObject','contractMark','preSettlePrice','preClosePrice','openPrice','highestPrice','lowestPrice','closePrice','settlePrice','turnoverVol','turnoverValue','openInt','CHG','CHG1','CHGPct','mainCon','smainCon','limitUpPrice','limitDownPrice']

想知道字段含义,可以自行到优矿网站查看

“选择日数据所在目录”按钮点击?

    def choice_daily_dir_btn_clicked(self):
        path = QtWidgets.QFileDialog.getExistingDirectory(
            self,
            '选择日数据所在目录',
            SQLITE_FROM_DIR
        )
        if not path:
            return
        self.choice_daily_dir_lineedit.setText(path)
        pass

?“执行”按钮点击

更新日数据的过程在子线程中进行,任务表示定义为 ?self.mark_str_step_two 常量,常量定义在 init_data()方法中

更新日数据逻辑:
1 将下载所得的日数据文件合并到一个 pd.DataFrame 中
2 从 t_symbol_basemsg 中获取到 日数据包含的合约名的品种、交割年份信息
3 将日数据追加到合约的csv文件,如果是新增的合约,创建新的csv文件;同时也把日数据插入到 t_last30_daily
4 更新 t_online_symbol 数据
5 计算主力合约,按品种逐一进行计算,并更新csv文件
5.1 主力合约换月规则参照文化财经的换月规则
5.2 从 t_last30_daily 获取某品种最近的日数据
5.3 如果某日只有一个合约,该合约为主力合约
5.4 计算某日 “成交量最大” 和 “持仓量最大” 的合约
5.5 如果“成交量最大”和“持仓量最大”的合约是同一个,并且与之前的主力合约也是同一个,那主力合约延续,不切换
5.6 如果“成交量最大”和“持仓量最大”的合约是同一个,但与之前的主力合约不是同一个,那预备下一个交易日主力合约切换为今日同时“成交量最大”和“持仓量最大”的合约,到下一个交易日,如果预备切换的合约依然是同时“成交量最大”和“持仓量最大”,切换,如果不是,不切换,主力合约延续
5.7 如果“成交量最大”和“持仓量最大”的合约不是同一个,那只要“成交量最大”是之前的主力合约,或“持仓量最大”是之前的主力合约,主力合约延续,不切换
5.8 如果“成交量最大”和“持仓量最大”的合约不是同一个,且都不是之前的主力合约,那主力合约取“成交量最大”的合约作为新的主力合约,当日切换

sqlite3代码

t_symbol_basemsg 查询日数据合约对应的合约信息

def query_pro_ticker_deliYear_from_symbol_basemsg_by_query(tickers:List,deliYear:int):
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    ticker_str = '\',\''.join(tickers)
    ticker_str = '\'' + ticker_str + '\''
    sql_str = '''
            select product_code,ticker,deliYear from t_symbol_basemsg where ticker in ({tickers}) and deliYear>={deliYear}
            '''.format(tickers=ticker_str,deliYear=deliYear)
    c.execute(sql_str)
    res_list = c.fetchall()
    # e
    conn.commit()
    conn.close()
    return res_list

t_last30_daily 批量插入数据

def batch_insert_last30_daily(pre_list: List):
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    sql_str = '''
    insert into t_last30_daily values (?,?,?,?,?,?,?,?,?,?,?,?)
    '''
    c.executemany(sql_str,pre_list)
    # e
    conn.commit()
    conn.close()
    pass

t_online_symbol 清空表格数据

def delete_all_data_of_online_symbol():
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    sql_str = '''
    delete from t_online_symbol
    '''
    c.execute(sql_str)
    # e
    conn.commit()
    conn.close()
    pass

t_online_symbol 批量插入数据

def batch_insert_online_symbol(pre_list:List):
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    sql_str = '''
                insert into t_online_symbol values (?,?,?,?)
                '''
    c.executemany(sql_str, pre_list)
    # e
    conn.commit()
    conn.close()
    pass

t_product 查询所有品种

def query_all_product_codes_of_product():
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    sql_str = '''
                select code from t_product
                '''
    c.execute(sql_str)
    res_list = c.fetchall()
    # e
    conn.commit()
    conn.close()
    res_list00 = []
    for item in res_list:
        res_list00.append(item[0])
    return res_list00

t_last30_daily 查询某品种最近的日数据

def query_daily_by_pro_in_last30_daily(product_code:str):
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    product_code_str = '\'' + product_code + '\''
    sql_str = '''
                select product_code,ticker,deliYear,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,settlePrice,turnoverVol,turnoverValue,openInt from t_last30_daily where product_code={product_code}
                '''.format(product_code=product_code_str)
    c.execute(sql_str)
    res_list = c.fetchall()
    # e
    conn.commit()
    conn.close()
    return res_list

t_main_symbol 新增品种就插入,已有品种如果主力合约切换就更新

def input_main_symbol(product_code:str,ticker:str,deliYear:int,start_date:str=None):
    conn = sqlite3.connect(YOUKUANG_DB_NAME)
    c = conn.cursor()
    # s
    product_code_str = '\''+product_code+'\''
    sql_str_query = '''
    select * from t_main_symbol where product_code={product_code}
    '''.format(product_code=product_code_str)
    c.execute(sql_str_query)
    res_one = c.fetchone()
    if len(res_one)<=0:
        # 新增品种插入
        pre_one = [product_code,ticker,deliYear,start_date]
        one_insert_main_symbol(pre_one)
        pass
    else:
        # 已有品种修改
        update_main_symbol(product_code,ticker,deliYear,start_date)
        pass
    # e
    conn.commit()
    conn.close()
    pass

按钮点击后执行的代码

    def excute_step_two_btn_clicked(self):
        dir_path = self.choice_daily_dir_lineedit.text()
        if len(dir_path) <= 0:
            QtWidgets.QMessageBox.information(
                self,
                '提示',
                '请选择日数据所在目录',
                QtWidgets.QMessageBox.Yes
            )
            return
        pre_map = {
            'dir_path': dir_path,
            'cur_deliYear': self.deliYear_spinbox.value()
        }
        self.start_caculate_thread(self.mark_str_step_two, pre_map)
        pass

子线程代码

子线程中任务名为 self.mark_str_step_two?的执行代码,于?running_caculate_thread 方法中对应任务名下

dir_path = data['dir_path']
cur_deliYear = data['cur_deliYear']

file_list = os.listdir(dir_path)
if len(file_list)<=0:
    self.thread_out_log('Error Error 没有日数据文件')
    pre_map = {
        'mark_str':self.mark_str_error,
        'data':'Error Error 没有日数据文件'
    }
    self.signal_excute.emit(pre_map)
    return
# 将下载的日数据合并到同一个df中
df = pd.DataFrame()
for item in file_list:
    file_path = dir_path + os.path.sep + item
    df_one = pd.read_csv(file_path,encoding='utf-8')
    if not df_one.columns.isin(self.d_csv_column_list).any():
        self.thread_out_log(f"{item},文件不是日数据文件")
        continue
    df = pd.concat([df,df_one])
    pass
if len(df)<=0:
    self.thread_out_log('Error Error 没有要更新的日数据')
    pre_map = {
        'mark_str': self.mark_str_error,
        'data': 'Error Error 没有要更新的日数据'
    }
    self.signal_excute.emit(pre_map)
    return

pre_add_tickers = df['ticker'].unique()
ticker_list = sqlite_tool.query_pro_ticker_deliYear_from_symbol_basemsg_by_query(pre_add_tickers,cur_deliYear)
ticker_map = {}
for item in ticker_list:
    ticker_map[item[1]] = item
    pass
df['o_date'] = pd.to_datetime(df['tradeDate'])
online_symbol_list = []
df_group = df.groupby(by='ticker',as_index=False)
self.thread_out_log('开始逐一处理每个合约:')
for name,group in df_group:
    self.thread_out_log(f'开始追加 {name}')
    # 将新数据添加到 t_last30_daily
    pre_new_df = group.loc[:,self.d_csv_column_list].copy()
    pre_new_df['product_code'] = ticker_map[name][0]
    pre_new_df['ticker'] = ticker_map[name][1]
    pre_new_df['deliYear'] = ticker_map[name][2]
    pre_new_list = pre_new_df.loc[:,self.last30_daily_column_list].values.tolist()
    sqlite_tool.batch_insert_last30_daily(pre_new_list)

    # 将新数据更新到csv
    csv_file_name = f"{name}_{ticker_map[name][2]}.csv"
    csv_file_path = YOUKUANG_D_DIR + csv_file_name
    if not os.path.exists(csv_file_path):
        exist_df = pd.DataFrame()
    else:
        df_one = pd.read_csv(csv_file_path,encoding='utf-8')
        df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])
        exist_df = df_one.loc[:,self.pre_d_csv_column_list].copy()
        pass

    new_df = group.loc[:,self.pre_d_csv_column_list].copy()
    if len(exist_df)>0:
        exist_df.sort_values(by='o_date',ascending=True,inplace=True)
        new_df = new_df.loc[new_df['o_date']>exist_df.iloc[-1]['o_date']].copy()
        pass
    if len(new_df)>0:
        pre_save_df = pd.concat([exist_df,new_df])
        pre_save_df.sort_values(by='o_date',ascending=True,inplace=True)
        pre_save_df = pre_save_df.loc[:,self.d_csv_column_list].copy()
        pre_save_df.to_csv(csv_file_path,encoding='utf-8')

        online_symbol_list.append(
            [ticker_map[name][0],name,ticker_map[name][2],pre_save_df.iloc[-1]['tradeDate']]
        )
        pass
    pass

# 删除 t_online_symbol 表格,加入新数据
self.thread_out_log('t_online_symbol表格更新')
sqlite_tool.delete_all_data_of_online_symbol()
sqlite_tool.batch_insert_online_symbol(online_symbol_list)

self.thread_out_log('------------- 开始计算主力合约:')
product_code_list = sqlite_tool.query_all_product_codes_of_product()
for pro_code in product_code_list:
    self.thread_out_log(f'产品:{pro_code}')
    csv_main_file_path = YOUKUANG_MAIN_DIR + pro_code + '.csv'
    if not os.path.exists(csv_main_file_path):
        # 新增品种
        cur_main_ticker = None
        cur_main_deliYear = None
        last_one_tradeDate = None
        pass
    else:
        df_one = pd.read_csv(csv_main_file_path,encoding='utf-8')
        df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])
        df_one.sort_values(by='o_date',ascending=True,inplace=True)
        cur_main_ticker = df_one.iloc[-1]['ticker']
        cur_main_deliYear = df_one.iloc[-1]['deliYear']
        last_one_tradeDate = df_one.iloc[-1]['tradeDate']
        pass

    # 从 t_last30_daily 中计算追加这几天的主力合约
    daily_list = sqlite_tool.query_daily_by_pro_in_last30_daily(pro_code)
    df_two = pd.DataFrame(columns=self.pre_main_csv_column_list,data=daily_list)
    df_two['o_date'] = pd.to_datetime(df_two['tradeDate'])
    if last_one_tradeDate:
        df_two = df_two.loc[df_two['o_date']>last_one_tradeDate].copy()
    df_two.dropna(inplace=True)
    if len(df_two) <= 0:
        self.thread_out_log(f'{pro_code},没有可追加的主力数据')
        continue
    df_two['row_i'] = [i for i in range(len(df_two))]
    df_two_group = df_two.groupby(by='o_date',as_index=False)
    df_main_new = pd.DataFrame()
    next_change_yeah = False
    pre_next_main_ticker = None
    pre_next_main_deliYear = None
    for two_name,group in df_two_group:
        if len(group)<=1:
            # 当天该品种只有一个合约,那当天主力合约就为该合约
            df_main_new = pd.concat([df_main_new,group.iloc[[0]]])
            sqlite_tool.input_main_symbol(pro_code, group.iloc[0]['ticker'], group.iloc[0]['deliYear'],
              group.iloc[0]['tradeDate'])
            cur_main_ticker = group.iloc[0]['ticker']
            cur_main_deliYear = group.iloc[0]['deliYear']
            pass
        else:
            # 计算每日最大成交量和最大持仓量
            df_vol = group.sort_values(by='turnoverVol', ascending=False)
            df_inte = group.sort_values(by='openInt', ascending=False)
            if df_vol.iloc[0]['row_i'] == df_inte.iloc[0]['row_i']:
# 当日成交量最大和持仓量最大 为同一个合约
if next_change_yeah:
    # 前一个交易日满足 【主力合约要被切换,下一个交易日切换为新的合约】条件,
    # 检查今日新的合约是否依然是成交量和持仓量最大,如果不是,不切换新合约
    if pre_next_main_ticker == df_vol.iloc[0]['ticker'] and pre_next_main_deliYear == \
            df_vol.iloc[0]['deliYear']:
        # 切换新合约
        df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
        sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],
          df_vol.iloc[0]['tradeDate'])
        cur_main_ticker = pre_next_main_ticker
        cur_main_deliYear = pre_next_main_deliYear
        next_change_yeah = False
        pass
    else:
        # 撤销昨日的 【主力合约要被切换,下一个交易日切换为新的合约】
        next_change_yeah = False
        # ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 start
        pre_next_main_ticker = df_vol.iloc[0]['ticker']
        pre_next_main_deliYear = df_vol.iloc[0]['deliYear']
        if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:
            # 主力合约没有切换,延续使用
            df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
            pass
        else:
            # 主力合约要被切换,下一个交易日切换为新的合约
            next_change_yeah = True
            cur_df = group.loc[(group['ticker'] == cur_main_ticker) & (
    group['deliYear'] == cur_main_deliYear)].copy()
            df_main_new = pd.concat([df_main_new, cur_df])
            pass
        # ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 end
        pass
    pass
else:
    # ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 start
    pre_next_main_ticker = df_vol.iloc[0]['ticker']
    pre_next_main_deliYear = df_vol.iloc[0]['deliYear']
    if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:
        # 主力合约没有切换,延续使用
        df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
        pass
    else:
        # 主力合约要被切换,下一个交易日切换为新的合约
        next_change_yeah = True
        cur_df = group.loc[
            (group['ticker'] == cur_main_ticker) & (
        group['deliYear'] == cur_main_deliYear)].copy()
        df_main_new = pd.concat([df_main_new, cur_df])
        pass
    # ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 end
    pass
            else:
# 当日成交量最大 和 持仓量最大 为不同的合约
# 成交量最大 或 持仓量最大,这两者中有一个的合约是当前主力合约,该主力合约就延续
if df_vol.iloc[0]['ticker'] == cur_main_ticker and df_vol.iloc[0][
    'deliYear'] == cur_main_deliYear:
    df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
elif df_inte.iloc[0]['ticker'] == cur_main_ticker and df_inte.iloc[0][
    'deliYear'] == cur_main_deliYear:
    df_main_new = pd.concat([df_main_new, df_inte.iloc[[0]]])
else:
    # 当日成交量最大 和 持仓量最大 都不是当前主力合约,将主力合约切换为成交量最大的合约
    df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
    sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],
      df_vol.iloc[0]['tradeDate'])
    cur_main_ticker = df_vol.iloc[0]['ticker']
    cur_main_deliYear = df_vol.iloc[0]['deliYear']
    pass
pass
            pass
        pass

    if len(df_main_new)<=0:
        self.thread_out_log(f'{pro_code},没有新增主力合约数据')
        pass
    else:
        df_main_new = df_main_new.loc[:,self.main_csv_column_list].copy()
        if not os.path.exists(csv_main_file_path):
            df_main_new.to_csv(csv_main_file_path, encoding='utf-8')
            pass
        else:
            df_one = df_one.loc[:, self.main_csv_column_list].copy()
            df_main_new00 = pd.concat([df_one, df_main_new])
            df_main_new00 = df_main_new00.loc[:, self.main_csv_column_list].copy()
            df_main_new00.to_csv(csv_main_file_path, encoding='utf-8')
            pass
        pass
    pass

pre_map = {
    'mark_str': self.mark_str_step_two,
    'data':None
}
self.signal_excute.emit(pre_map)
pass

文章来源:https://blog.csdn.net/m0_37967652/article/details/135431074
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。