diff --git a/app.py b/app.py index d3eed2e..886d334 100644 --- a/app.py +++ b/app.py @@ -22,6 +22,7 @@ from sector_strategy_ui import display_sector_strategy from longhubang_ui import display_longhubang from smart_monitor_ui import smart_monitor_ui +from momentum_filter_ui import display_momentum_filter # 页面配置 st.set_page_config( @@ -294,7 +295,7 @@ def main(): if st.button("🏠 股票分析", width='stretch', key="nav_home", help="返回首页,进行单只股票的深度分析"): # 清除所有功能页面标志 for key in ['show_history', 'show_monitor', 'show_config', 'show_main_force', - 'show_sector_strategy', 'show_longhubang', 'show_portfolio']: + 'show_sector_strategy', 'show_longhubang', 'show_portfolio', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] @@ -307,7 +308,14 @@ def main(): if st.button("💰 主力选股", width='stretch', key="nav_main_force", help="基于主力资金流向的选股策略"): st.session_state.show_main_force = True for key in ['show_history', 'show_monitor', 'show_config', 'show_sector_strategy', - 'show_longhubang', 'show_portfolio']: + 'show_longhubang', 'show_portfolio', 'show_momentum_filter']: + if key in st.session_state: + del st.session_state[key] + + if st.button("🚀 一进二策略", width='stretch', key="nav_momentum_filter", help="捕捉首板涨停后续二板机会"): + st.session_state.show_momentum_filter = True + for key in ['show_history', 'show_monitor', 'show_config', 'show_sector_strategy', + 'show_longhubang', 'show_portfolio', 'show_main_force']: if key in st.session_state: del st.session_state[key] @@ -318,14 +326,14 @@ def main(): if st.button("🎯 智策板块", width='stretch', key="nav_sector_strategy", help="AI板块策略分析"): st.session_state.show_sector_strategy = True for key in ['show_history', 'show_monitor', 'show_config', 'show_main_force', - 'show_longhubang', 'show_portfolio', 'show_smart_monitor']: + 'show_longhubang', 'show_portfolio', 'show_smart_monitor', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] if st.button("🐉 智瞰龙虎", width='stretch', key="nav_longhubang", help="龙虎榜深度分析"): st.session_state.show_longhubang = True for key in ['show_history', 'show_monitor', 'show_config', 'show_main_force', - 'show_sector_strategy', 'show_portfolio', 'show_smart_monitor']: + 'show_sector_strategy', 'show_portfolio', 'show_smart_monitor', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] @@ -336,21 +344,21 @@ def main(): if st.button("📊 持仓分析", width='stretch', key="nav_portfolio", help="投资组合分析与定时跟踪"): st.session_state.show_portfolio = True for key in ['show_history', 'show_monitor', 'show_config', 'show_main_force', - 'show_sector_strategy', 'show_longhubang', 'show_smart_monitor']: + 'show_sector_strategy', 'show_longhubang', 'show_smart_monitor', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] if st.button("🤖 AI盯盘", width='stretch', key="nav_smart_monitor", help="DeepSeek AI自动盯盘决策交易(支持A股T+1)"): st.session_state.show_smart_monitor = True for key in ['show_history', 'show_monitor', 'show_config', 'show_main_force', - 'show_sector_strategy', 'show_longhubang', 'show_portfolio']: + 'show_sector_strategy', 'show_longhubang', 'show_portfolio', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] if st.button("📡 实时监测", width='stretch', key="nav_monitor", help="价格监控与预警提醒"): st.session_state.show_monitor = True for key in ['show_history', 'show_main_force', 'show_longhubang', 'show_portfolio', - 'show_config', 'show_sector_strategy', 'show_smart_monitor']: + 'show_config', 'show_sector_strategy', 'show_smart_monitor', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] @@ -360,7 +368,7 @@ def main(): if st.button("📖 历史记录", width='stretch', key="nav_history", help="查看历史分析记录"): st.session_state.show_history = True for key in ['show_monitor', 'show_longhubang', 'show_portfolio', 'show_config', - 'show_main_force', 'show_sector_strategy']: + 'show_main_force', 'show_sector_strategy', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] @@ -368,7 +376,7 @@ def main(): if st.button("⚙️ 环境配置", width='stretch', key="nav_config", help="系统设置与API配置"): st.session_state.show_config = True for key in ['show_history', 'show_monitor', 'show_main_force', 'show_sector_strategy', - 'show_longhubang', 'show_portfolio']: + 'show_longhubang', 'show_portfolio', 'show_momentum_filter']: if key in st.session_state: del st.session_state[key] @@ -461,6 +469,11 @@ def main(): display_main_force_selector() return + # 检查是否显示一进二策略 + if 'show_momentum_filter' in st.session_state and st.session_state.show_momentum_filter: + display_momentum_filter() + return + # 检查是否显示智策板块 if 'show_sector_strategy' in st.session_state and st.session_state.show_sector_strategy: display_sector_strategy() diff --git a/momentum_filter_data.py b/momentum_filter_data.py new file mode 100644 index 0000000..0027373 --- /dev/null +++ b/momentum_filter_data.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +一进二策略数据获取模块 +获取涨停板股票数据,支持多种数据源 +""" + +import pandas as pd +import akshare as ak +import pywencai +from datetime import datetime, timedelta +from typing import Dict, List, Tuple, Optional +import time +import numpy as np + + +class MomentumFilterData: + """一进二策略数据获取类""" + + def __init__(self): + self.raw_data = None + self.limit_up_stocks = None + + def get_limit_up_stocks(self, trade_date: str = None) -> Tuple[bool, pd.DataFrame, str]: + """ + 获取指定日期的涨停板股票(首板) + + Args: + trade_date: 交易日期,格式"YYYYMMDD"或"YYYY-MM-DD",默认为最近交易日 + + Returns: + (success, dataframe, message) + """ + try: + # 如果没有指定日期,使用最近交易日 + if not trade_date: + trade_date = datetime.now().strftime("%Y%m%d") + + # 统一日期格式为YYYYMMDD + trade_date_str = trade_date.replace("-", "") + + print(f"\n{'='*60}") + print(f"🔍 一进二策略 - 获取涨停板数据") + print(f"{'='*60}") + print(f"交易日期: {trade_date_str}") + + # 尝试多种方法获取涨停板数据 + df = None + + # 方法1: 使用问财获取涨停板数据(推荐) + try: + df = self._get_limit_up_from_wencai(trade_date_str) + if df is not None and not df.empty: + print(f"✅ 使用问财成功获取 {len(df)} 只涨停股票") + except Exception as e: + print(f"⚠️ 问财获取失败: {str(e)}") + + # 方法2: 使用akshare获取涨停板数据(备用) + if df is None or df.empty: + try: + df = self._get_limit_up_from_akshare(trade_date_str) + if df is not None and not df.empty: + print(f"✅ 使用akshare成功获取 {len(df)} 只涨停股票") + except Exception as e: + print(f"⚠️ akshare获取失败: {str(e)}") + + if df is None or df.empty: + return False, None, "未能获取到涨停板数据,请检查日期是否为交易日" + + self.raw_data = df + return True, df, f"成功获取{len(df)}只涨停股票数据" + + except Exception as e: + error_msg = f"获取涨停板数据失败: {str(e)}" + print(f"\n❌ {error_msg}") + return False, None, error_msg + + def _get_limit_up_from_wencai(self, trade_date: str) -> Optional[pd.DataFrame]: + """ + 使用问财获取涨停板数据 + + Args: + trade_date: 交易日期字符串,格式YYYYMMDD + + Returns: + DataFrame或None + """ + try: + # 转换日期格式为问财可识别的格式 + date_obj = datetime.strptime(trade_date, "%Y%m%d") + wencai_date = f"{date_obj.year}年{date_obj.month}月{date_obj.day}日" + + # 构建查询语句 - 获取首板涨停股票及相关数据 + queries = [ + # 方案1: 完整查询 + f"{wencai_date}涨停,非st,非科创板,非一字板," + f"涨停时间,涨停封单量,炸板次数,所属板块,流通市值," + f"昨日成交额,今日成交额,换手率,市盈率,股价," + f"连板天数=1", + + # 方案2: 简化查询 + f"{wencai_date}涨停板,排除st,排除科创板,排除一字板," + f"涨停时间,封板金额,所属行业,流通市值,股价", + + # 方案3: 基础查询 + f"{wencai_date}涨停股票,非st非科创板,流通市值,股价,所属行业", + ] + + for i, query in enumerate(queries, 1): + print(f" 尝试问财方案 {i}/{len(queries)}...") + try: + result = pywencai.get(query=query, loop=True) + + if result is None: + continue + + df = self._convert_to_dataframe(result) + + if df is not None and not df.empty: + # 数据清洗和标准化 + df = self._clean_limit_up_data(df) + return df + + except Exception as e: + print(f" 问财方案{i}失败: {str(e)}") + time.sleep(1) + continue + + return None + + except Exception as e: + print(f"问财获取失败: {e}") + return None + + def _get_limit_up_from_akshare(self, trade_date: str) -> Optional[pd.DataFrame]: + """ + 使用akshare获取涨停板数据 + + Args: + trade_date: 交易日期字符串,格式YYYYMMDD + + Returns: + DataFrame或None + """ + try: + # 转换日期格式 + date_str = f"{trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}" + + # 获取涨停板数据 + df = ak.stock_zt_pool_em(date=date_str) + + if df is None or df.empty: + return None + + # 数据标准化 + df = self._standardize_akshare_data(df) + + return df + + except Exception as e: + print(f"akshare获取失败: {e}") + return None + + def _convert_to_dataframe(self, result) -> Optional[pd.DataFrame]: + """转换问财返回结果为DataFrame""" + try: + if isinstance(result, pd.DataFrame): + return result + elif isinstance(result, dict): + if 'tableV1' in result: + table_data = result['tableV1'] + if isinstance(table_data, pd.DataFrame): + return table_data + elif isinstance(table_data, list): + return pd.DataFrame(table_data) + return pd.DataFrame([result]) + elif isinstance(result, list): + return pd.DataFrame(result) + return None + except Exception as e: + print(f"转换DataFrame失败: {e}") + return None + + def _clean_limit_up_data(self, df: pd.DataFrame) -> pd.DataFrame: + """清洗和标准化涨停板数据""" + try: + # 标准化列名(问财返回的列名可能不同) + column_mapping = { + '股票代码': 'code', + '代码': 'code', + '股票简称': 'name', + '名称': 'name', + '最新价': 'price', + '现价': 'price', + '涨停价': 'limit_price', + '涨跌幅': 'change_pct', + '涨跌幅%': 'change_pct', + '涨停时间': 'limit_time', + '首次涨停时间': 'limit_time', + '封板金额': 'seal_amount', + '涨停封单量': 'seal_amount', + '炸板次数': 'broken_times', + '打开次数': 'broken_times', + '流通市值': 'circulation_market_cap', + '流通市值(元)': 'circulation_market_cap', + '总市值': 'total_market_cap', + '所属板块': 'sector', + '所属行业': 'sector', + '行业': 'sector', + '换手率': 'turnover_rate', + '换手率%': 'turnover_rate', + '昨日成交额': 'yesterday_volume', + '今日成交额': 'today_volume', + '成交额': 'today_volume', + '市盈率': 'pe_ratio', + '市盈率(动态)': 'pe_ratio', + '连板天数': 'continuous_limit', + } + + # 重命名列 + for old_name, new_name in column_mapping.items(): + if old_name in df.columns: + df = df.rename(columns={old_name: new_name}) + + # 确保必要的列存在 + required_columns = ['code', 'name'] + for col in required_columns: + if col not in df.columns: + print(f"⚠️ 缺少必要列: {col}") + + # 数据类型转换 + numeric_columns = ['price', 'change_pct', 'seal_amount', 'circulation_market_cap', + 'total_market_cap', 'turnover_rate', 'pe_ratio', 'broken_times'] + + for col in numeric_columns: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce') + + # 处理市值单位(统一为亿元) + if 'circulation_market_cap' in df.columns: + # 如果是元为单位,转换为亿元 + max_val = df['circulation_market_cap'].max() + if max_val > 10000: # 假设超过10000的是元为单位 + df['circulation_market_cap'] = df['circulation_market_cap'] / 100000000 + + if 'total_market_cap' in df.columns: + max_val = df['total_market_cap'].max() + if max_val > 10000: + df['total_market_cap'] = df['total_market_cap'] / 100000000 + + return df + + except Exception as e: + print(f"数据清洗失败: {e}") + return df + + def _standardize_akshare_data(self, df: pd.DataFrame) -> pd.DataFrame: + """标准化akshare数据格式""" + try: + # akshare涨停板数据的列名映射 + column_mapping = { + '代码': 'code', + '名称': 'name', + '涨跌幅': 'change_pct', + '最新价': 'price', + '涨停价': 'limit_price', + '成交额': 'today_volume', + '流通市值': 'circulation_market_cap', + '总市值': 'total_market_cap', + '换手率': 'turnover_rate', + '封板资金': 'seal_amount', + '首次封板时间': 'limit_time', + '最后封板时间': 'last_limit_time', + '炸板次数': 'broken_times', + '涨停统计': 'limit_statistics', + } + + # 重命名列 + for old_name, new_name in column_mapping.items(): + if old_name in df.columns: + df = df.rename(columns={old_name: new_name}) + + # 数据类型转换 + numeric_columns = ['price', 'change_pct', 'seal_amount', 'circulation_market_cap', + 'total_market_cap', 'turnover_rate', 'broken_times'] + + for col in numeric_columns: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce') + + # 处理市值单位 + if 'circulation_market_cap' in df.columns: + max_val = df['circulation_market_cap'].max() + if max_val > 10000: + df['circulation_market_cap'] = df['circulation_market_cap'] / 100000000 + + if 'total_market_cap' in df.columns: + max_val = df['total_market_cap'].max() + if max_val > 10000: + df['total_market_cap'] = df['total_market_cap'] / 100000000 + + return df + + except Exception as e: + print(f"akshare数据标准化失败: {e}") + return df + + def get_stock_historical_data(self, code: str, days: int = 60) -> Optional[pd.DataFrame]: + """ + 获取个股历史数据 + + Args: + code: 股票代码 + days: 获取天数 + + Returns: + DataFrame或None + """ + try: + # 计算开始日期 + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + start_str = start_date.strftime("%Y%m%d") + end_str = end_date.strftime("%Y%m%d") + + # 使用akshare获取历史数据 + df = ak.stock_zh_a_hist(symbol=code, period="daily", + start_date=start_str, end_date=end_str, adjust="qfq") + + return df + + except Exception as e: + print(f"获取{code}历史数据失败: {e}") + return None + + def get_stock_basic_info(self, code: str) -> Dict: + """ + 获取股票基本信息 + + Args: + code: 股票代码 + + Returns: + 字典格式的基本信息 + """ + try: + # 使用akshare获取个股信息 + info = ak.stock_individual_info_em(symbol=code) + + if info is None or info.empty: + return {} + + # 转换为字典 + info_dict = {} + for _, row in info.iterrows(): + key = row.get('item', '') + value = row.get('value', '') + info_dict[key] = value + + return info_dict + + except Exception as e: + print(f"获取{code}基本信息失败: {e}") + return {} diff --git a/momentum_filter_engine.py b/momentum_filter_engine.py new file mode 100644 index 0000000..5cea06a --- /dev/null +++ b/momentum_filter_engine.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +一进二策略筛选引擎 +实现一进二策略的智能筛选和评分逻辑 +""" + +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from typing import Dict, List, Tuple, Optional +from momentum_filter_data import MomentumFilterData + + +class MomentumFilterEngine: + """一进二策略筛选引擎""" + + def __init__(self): + self.data_fetcher = MomentumFilterData() + self.filtered_stocks = None + self.scored_stocks = None + + def filter_first_board_stocks(self, + df: pd.DataFrame, + max_price: float = 30.0, + max_market_cap: float = 100.0, + exclude_sh: bool = True, + exclude_cyb: bool = True, + exclude_one_word: bool = True) -> pd.DataFrame: + """ + 筛选符合一进二条件的首板股票 + + Args: + df: 原始涨停板数据 + max_price: 最大股价(元) + max_market_cap: 最大流通市值(亿元) + exclude_sh: 是否排除沪市股票 + exclude_cyb: 是否排除创业板(300开头) + exclude_one_word: 是否排除一字板 + + Returns: + 筛选后的DataFrame + """ + try: + print(f"\n{'='*60}") + print(f"📊 一进二策略 - 首板筛选") + print(f"{'='*60}") + print(f"原始涨停股票数量: {len(df)}") + + filtered_df = df.copy() + + # 1. 股价筛选 + if 'price' in filtered_df.columns: + before_count = len(filtered_df) + filtered_df = filtered_df[filtered_df['price'] <= max_price] + print(f"✓ 股价≤{max_price}元: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + # 2. 市值筛选 + if 'circulation_market_cap' in filtered_df.columns: + before_count = len(filtered_df) + filtered_df = filtered_df[filtered_df['circulation_market_cap'] <= max_market_cap] + print(f"✓ 流通市值≤{max_market_cap}亿: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + # 3. 排除沪市股票(60开头) + if exclude_sh and 'code' in filtered_df.columns: + before_count = len(filtered_df) + filtered_df = filtered_df[~filtered_df['code'].astype(str).str.startswith('6')] + print(f"✓ 排除沪市股票: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + # 4. 排除创业板(300开头) + if exclude_cyb and 'code' in filtered_df.columns: + before_count = len(filtered_df) + filtered_df = filtered_df[~filtered_df['code'].astype(str).str.startswith('300')] + print(f"✓ 排除创业板: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + # 5. 排除一字板(没有换手或换手率极低) + if exclude_one_word and 'turnover_rate' in filtered_df.columns: + before_count = len(filtered_df) + # 换手率低于0.5%的认为是一字板 + filtered_df = filtered_df[ + (filtered_df['turnover_rate'].isna()) | + (filtered_df['turnover_rate'] > 0.5) + ] + print(f"✓ 排除一字板: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + # 6. 排除ST股票 + if 'name' in filtered_df.columns: + before_count = len(filtered_df) + filtered_df = filtered_df[~filtered_df['name'].str.contains('ST', na=False)] + print(f"✓ 排除ST股票: 剩余 {len(filtered_df)} 只 (过滤 {before_count - len(filtered_df)} 只)") + + print(f"\n最终筛选结果: {len(filtered_df)} 只股票") + self.filtered_stocks = filtered_df + + return filtered_df + + except Exception as e: + print(f"❌ 筛选失败: {str(e)}") + return df + + def score_stocks(self, df: pd.DataFrame) -> pd.DataFrame: + """ + 对筛选后的股票进行评分 + + 评分维度: + 1. 涨停时间(越早越好)- 30分 + 2. 封单强度(封单金额/流通市值)- 25分 + 3. 炸板次数(越少越好)- 20分 + 4. 换手率(适中为好)- 15分 + 5. 市值大小(越小越好)- 10分 + + Args: + df: 筛选后的股票数据 + + Returns: + 带有评分的DataFrame + """ + try: + print(f"\n{'='*60}") + print(f"⭐ 一进二策略 - 股票评分") + print(f"{'='*60}") + + scored_df = df.copy() + scored_df['score'] = 0.0 + + # 1. 涨停时间评分(30分) + if 'limit_time' in scored_df.columns: + scored_df['time_score'] = self._score_limit_time(scored_df['limit_time']) + print(f"✓ 涨停时间评分完成") + else: + scored_df['time_score'] = 15.0 # 默认中等分 + + # 2. 封单强度评分(25分) + if 'seal_amount' in scored_df.columns and 'circulation_market_cap' in scored_df.columns: + scored_df['seal_score'] = self._score_seal_strength( + scored_df['seal_amount'], + scored_df['circulation_market_cap'] + ) + print(f"✓ 封单强度评分完成") + else: + scored_df['seal_score'] = 12.5 # 默认中等分 + + # 3. 炸板次数评分(20分) + if 'broken_times' in scored_df.columns: + scored_df['broken_score'] = self._score_broken_times(scored_df['broken_times']) + print(f"✓ 炸板次数评分完成") + else: + scored_df['broken_score'] = 10.0 # 默认中等分 + + # 4. 换手率评分(15分) + if 'turnover_rate' in scored_df.columns: + scored_df['turnover_score'] = self._score_turnover_rate(scored_df['turnover_rate']) + print(f"✓ 换手率评分完成") + else: + scored_df['turnover_score'] = 7.5 # 默认中等分 + + # 5. 市值评分(10分) + if 'circulation_market_cap' in scored_df.columns: + scored_df['cap_score'] = self._score_market_cap(scored_df['circulation_market_cap']) + print(f"✓ 市值评分完成") + else: + scored_df['cap_score'] = 5.0 # 默认中等分 + + # 计算总分 + scored_df['total_score'] = ( + scored_df['time_score'] + + scored_df['seal_score'] + + scored_df['broken_score'] + + scored_df['turnover_score'] + + scored_df['cap_score'] + ) + + # 按总分排序 + scored_df = scored_df.sort_values('total_score', ascending=False) + + print(f"\n✅ 评分完成,最高分: {scored_df['total_score'].max():.1f},最低分: {scored_df['total_score'].min():.1f}") + + self.scored_stocks = scored_df + + return scored_df + + except Exception as e: + print(f"❌ 评分失败: {str(e)}") + return df + + def _score_limit_time(self, limit_time_series: pd.Series) -> pd.Series: + """ + 涨停时间评分 + 9:30-10:00: 25-30分(最强) + 10:00-11:00: 15-25分 + 11:00-14:00: 5-15分 + 14:00-15:00: 0-5分(最弱) + """ + scores = pd.Series(index=limit_time_series.index, dtype=float) + + for idx, time_str in limit_time_series.items(): + if pd.isna(time_str): + scores[idx] = 15.0 # 默认中等分 + continue + + try: + # 处理不同的时间格式 + time_str = str(time_str).strip() + + # 尝试解析时间 + if ':' in time_str: + # 格式: "09:30:00" 或 "09:30" + parts = time_str.split(':') + hour = int(parts[0]) + minute = int(parts[1]) + elif len(time_str) >= 4: + # 格式: "0930" 或 "093000" + hour = int(time_str[:2]) + minute = int(time_str[2:4]) + else: + scores[idx] = 15.0 + continue + + # 计算分钟数(从9:30开始) + minutes_from_start = (hour - 9) * 60 + minute - 30 + + if minutes_from_start < 0: + minutes_from_start = 0 + elif minutes_from_start > 330: # 15:00之后 + minutes_from_start = 330 + + # 评分逻辑 + if minutes_from_start <= 30: # 9:30-10:00 + scores[idx] = 30.0 - (minutes_from_start / 30) * 5 # 25-30分 + elif minutes_from_start <= 90: # 10:00-11:00 + scores[idx] = 25.0 - ((minutes_from_start - 30) / 60) * 10 # 15-25分 + elif minutes_from_start <= 270: # 11:00-14:00 + scores[idx] = 15.0 - ((minutes_from_start - 90) / 180) * 10 # 5-15分 + else: # 14:00-15:00 + scores[idx] = 5.0 - ((minutes_from_start - 270) / 60) * 5 # 0-5分 + + except: + scores[idx] = 15.0 # 解析失败,给默认分 + + return scores + + def _score_seal_strength(self, seal_amount: pd.Series, market_cap: pd.Series) -> pd.Series: + """ + 封单强度评分 + 封单比例 = 封单金额 / 流通市值 + 比例越高,分数越高 + """ + scores = pd.Series(index=seal_amount.index, dtype=float) + + for idx in seal_amount.index: + try: + seal = seal_amount[idx] + cap = market_cap[idx] + + if pd.isna(seal) or pd.isna(cap) or cap == 0: + scores[idx] = 12.5 # 默认中等分 + continue + + # 计算封单比例(百分比) + ratio = (seal / (cap * 100000000)) * 100 # 市值单位是亿 + + # 评分逻辑 + if ratio >= 10: # 封单比例>=10%,非常强 + scores[idx] = 25.0 + elif ratio >= 5: # 5-10%,很强 + scores[idx] = 20.0 + (ratio - 5) / 5 * 5 + elif ratio >= 2: # 2-5%,较强 + scores[idx] = 15.0 + (ratio - 2) / 3 * 5 + elif ratio >= 1: # 1-2%,一般 + scores[idx] = 10.0 + (ratio - 1) * 5 + else: # <1%,较弱 + scores[idx] = ratio * 10 + + except: + scores[idx] = 12.5 + + return scores + + def _score_broken_times(self, broken_times: pd.Series) -> pd.Series: + """ + 炸板次数评分 + 0次: 20分(最佳) + 1次: 15分 + 2次: 10分 + 3次及以上: 5分 + """ + scores = pd.Series(index=broken_times.index, dtype=float) + + for idx, times in broken_times.items(): + if pd.isna(times): + scores[idx] = 20.0 # 假设没有炸板 + elif times == 0: + scores[idx] = 20.0 + elif times == 1: + scores[idx] = 15.0 + elif times == 2: + scores[idx] = 10.0 + else: + scores[idx] = 5.0 + + return scores + + def _score_turnover_rate(self, turnover_rate: pd.Series) -> pd.Series: + """ + 换手率评分 + 5-15%: 15分(最佳,充分换手) + 3-5% 或 15-20%: 10-15分(较好) + 1-3% 或 20-30%: 5-10分(一般) + <1% 或 >30%: 0-5分(不好) + """ + scores = pd.Series(index=turnover_rate.index, dtype=float) + + for idx, rate in turnover_rate.items(): + if pd.isna(rate): + scores[idx] = 7.5 # 默认中等分 + continue + + if 5 <= rate <= 15: # 最佳区间 + scores[idx] = 15.0 + elif 3 <= rate < 5: # 偏低但可接受 + scores[idx] = 10.0 + (rate - 3) / 2 * 5 + elif 15 < rate <= 20: # 偏高但可接受 + scores[idx] = 15.0 - (rate - 15) / 5 * 5 + elif 1 <= rate < 3: # 较低 + scores[idx] = 5.0 + (rate - 1) / 2 * 5 + elif 20 < rate <= 30: # 较高 + scores[idx] = 10.0 - (rate - 20) / 10 * 5 + elif rate < 1: # 很低(可能是一字板) + scores[idx] = rate * 5 + else: # >30%,换手太大 + scores[idx] = max(0, 5.0 - (rate - 30) / 10) + + return scores + + def _score_market_cap(self, market_cap: pd.Series) -> pd.Series: + """ + 市值评分 + 市值越小,分数越高 + <30亿: 10分 + 30-50亿: 7-10分 + 50-80亿: 5-7分 + >80亿: 0-5分 + """ + scores = pd.Series(index=market_cap.index, dtype=float) + + for idx, cap in market_cap.items(): + if pd.isna(cap): + scores[idx] = 5.0 # 默认中等分 + continue + + if cap < 30: + scores[idx] = 10.0 + elif cap < 50: + scores[idx] = 10.0 - (cap - 30) / 20 * 3 + elif cap < 80: + scores[idx] = 7.0 - (cap - 50) / 30 * 2 + else: + scores[idx] = max(0, 5.0 - (cap - 80) / 40 * 5) + + return scores + + def get_top_stocks(self, n: int = 10) -> pd.DataFrame: + """ + 获取评分最高的前N只股票 + + Args: + n: 返回的股票数量 + + Returns: + 前N只股票的DataFrame + """ + if self.scored_stocks is None or self.scored_stocks.empty: + return pd.DataFrame() + + return self.scored_stocks.head(n) + + def generate_report(self, stock_df: pd.DataFrame) -> str: + """ + 生成选股报告 + + Args: + stock_df: 股票数据 + + Returns: + 报告文本 + """ + try: + if stock_df.empty: + return "暂无数据" + + report = [] + report.append("="*60) + report.append("一进二策略选股报告") + report.append("="*60) + report.append("") + + for idx, row in stock_df.iterrows(): + code = row.get('code', 'N/A') + name = row.get('name', 'N/A') + price = row.get('price', 0) + score = row.get('total_score', 0) + + report.append(f"【{name}】({code})") + report.append(f" 股价: {price:.2f}元") + report.append(f" 综合评分: {score:.1f}/100") + + if 'limit_time' in row: + report.append(f" 涨停时间: {row['limit_time']}") + + if 'circulation_market_cap' in row: + report.append(f" 流通市值: {row['circulation_market_cap']:.2f}亿") + + if 'turnover_rate' in row: + report.append(f" 换手率: {row['turnover_rate']:.2f}%") + + if 'broken_times' in row and not pd.isna(row['broken_times']): + report.append(f" 炸板次数: {int(row['broken_times'])}次") + + if 'sector' in row: + report.append(f" 所属板块: {row['sector']}") + + report.append("") + + return "\n".join(report) + + except Exception as e: + return f"生成报告失败: {str(e)}" diff --git a/momentum_filter_ui.py b/momentum_filter_ui.py new file mode 100644 index 0000000..53c45c6 --- /dev/null +++ b/momentum_filter_ui.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +一进二策略UI界面模块 +""" + +import streamlit as st +import pandas as pd +import plotly.express as px +import plotly.graph_objects as go +from datetime import datetime, timedelta +from momentum_filter_data import MomentumFilterData +from momentum_filter_engine import MomentumFilterEngine + + +def display_momentum_filter(): + """显示一进二策略界面""" + + # 页面标题 + st.markdown("## 🚀 一进二策略 - 智能选股") + st.markdown("---") + + # 策略说明 + with st.expander("📖 什么是一进二策略?", expanded=False): + st.markdown(""" + ### 策略概述 + + **一进二策略**是一种捕捉个股在首个涨停板(首板)后,于第二个交易日继续涨停(二板)的短线交易方法。 + + ### 核心原理 + + - **首板筛选**: 从当日所有涨停股票中,筛选出具有连板潜力的优质标的 + - **技术特征**: 关注涨停时间、封单强度、炸板次数、换手率等关键指标 + - **基本面**: 优选低价、小市值、深市股票,避免ST、科创板 + + ### 选股标准 + + 1. ✅ **股价**: 优选30元以下(最好20元以下) + 2. ✅ **市值**: 流通市值100亿以下(最好80亿以下) + 3. ✅ **市场**: 深市股票(排除60开头) + 4. ✅ **板块**: 排除创业板300开头、科创板688开头 + 5. ✅ **涨停质量**: 涨停时间早、封单强、无炸板或少炸板 + 6. ✅ **换手率**: 5-15%为最佳(充分换手) + 7. ✅ **排除**: ST股票、一字板 + + ### 评分维度 + + 本系统对首板股票进行综合评分(满分100分): + + - **涨停时间** (30分): 越早越好,9:30-10:00最佳 + - **封单强度** (25分): 封单金额/流通市值比例越高越好 + - **炸板次数** (20分): 0次最佳,次数越多越差 + - **换手率** (15分): 5-15%最佳,过低或过高都不好 + - **市值大小** (10分): 市值越小越好 + + ### 风险提示 + + ⚠️ 一进二策略属于高风险短线策略,需要: + - 严格的仓位管理(单票不超过三成) + - 及时止损(-5%到-7%) + - 关注市场整体情绪 + - 避免在市场情绪低迷时操作 + """) + + st.markdown("---") + + # 参数设置 + st.subheader("📋 筛选参数设置") + + col1, col2, col3 = st.columns(3) + + with col1: + # 日期选择 + trade_date_option = st.selectbox( + "选择交易日期", + ["今天", "昨天", "自定义日期"] + ) + + if trade_date_option == "今天": + trade_date = datetime.now().strftime("%Y%m%d") + elif trade_date_option == "昨天": + trade_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") + else: + custom_date = st.date_input( + "选择日期", + value=datetime.now() - timedelta(days=1) + ) + trade_date = custom_date.strftime("%Y%m%d") + + st.info(f"📅 交易日期: {trade_date[:4]}-{trade_date[4:6]}-{trade_date[6:]}") + + with col2: + max_price = st.slider( + "最大股价(元)", + min_value=10, + max_value=100, + value=30, + step=5, + help="建议30元以下" + ) + + with col3: + max_market_cap = st.slider( + "最大流通市值(亿)", + min_value=20, + max_value=500, + value=100, + step=10, + help="建议100亿以下" + ) + + # 高级筛选选项 + with st.expander("⚙️ 高级筛选选项"): + col1, col2, col3 = st.columns(3) + + with col1: + exclude_sh = st.checkbox("排除沪市(60开头)", value=True, + help="沪市监管更严格,摸不着头脑") + + with col2: + exclude_cyb = st.checkbox("排除创业板(300开头)", value=True, + help="300首板20cm相当于2板,连板概率小") + + with col3: + exclude_one_word = st.checkbox("排除一字板", value=True, + help="一字板缺乏充分换手,后续动力不足") + + top_n = st.slider( + "显示前N只股票", + min_value=5, + max_value=50, + value=20, + step=5, + help="按综合评分从高到低显示" + ) + + st.markdown("---") + + # 开始筛选按钮 + if st.button("🔍 开始筛选", type="primary", use_container_width=True): + + with st.spinner("正在获取涨停板数据..."): + + # 创建数据获取器和引擎 + data_fetcher = MomentumFilterData() + engine = MomentumFilterEngine() + + # 获取涨停板数据 + success, df, message = data_fetcher.get_limit_up_stocks(trade_date) + + if not success or df is None or df.empty: + st.error(f"❌ {message}") + st.info("💡 提示: 请检查日期是否为交易日,或稍后重试") + return + + st.success(f"✅ {message}") + + # 显示原始数据 + with st.expander("📊 原始涨停板数据", expanded=False): + st.dataframe(df, use_container_width=True) + + with st.spinner("正在筛选优质首板..."): + + # 筛选首板股票 + filtered_df = engine.filter_first_board_stocks( + df, + max_price=max_price, + max_market_cap=max_market_cap, + exclude_sh=exclude_sh, + exclude_cyb=exclude_cyb, + exclude_one_word=exclude_one_word + ) + + if filtered_df.empty: + st.warning("⚠️ 没有符合条件的股票,请放宽筛选条件") + return + + with st.spinner("正在评分排序..."): + + # 对股票进行评分 + scored_df = engine.score_stocks(filtered_df) + + # 获取前N只股票 + top_stocks = engine.get_top_stocks(top_n) + + st.success(f"✅ 筛选完成!共找到 {len(filtered_df)} 只符合条件的股票") + + # 显示结果 + display_results(top_stocks, scored_df) + + # 保存到session state供后续使用 + st.session_state['momentum_filter_results'] = top_stocks + st.session_state['momentum_filter_all'] = scored_df + + +def display_results(top_stocks: pd.DataFrame, all_stocks: pd.DataFrame): + """显示筛选结果""" + + st.markdown("---") + st.subheader("📈 筛选结果") + + # 统计信息 + col1, col2, col3, col4 = st.columns(4) + + with col1: + st.metric("符合条件股票数", len(all_stocks)) + + with col2: + avg_score = all_stocks['total_score'].mean() + st.metric("平均得分", f"{avg_score:.1f}") + + with col3: + max_score = all_stocks['total_score'].max() + st.metric("最高得分", f"{max_score:.1f}") + + with col4: + min_score = all_stocks['total_score'].min() + st.metric("最低得分", f"{min_score:.1f}") + + # Top N 股票详细信息 + st.markdown("### 🏆 Top股票详情") + + for idx, row in top_stocks.iterrows(): + display_stock_card(row) + + # 评分分布图 + st.markdown("---") + st.subheader("📊 评分分布分析") + + col1, col2 = st.columns(2) + + with col1: + # 总分分布直方图 + fig = px.histogram( + all_stocks, + x='total_score', + nbins=20, + title="总分分布", + labels={'total_score': '综合评分', 'count': '股票数量'} + ) + fig.update_layout(showlegend=False) + st.plotly_chart(fig, use_container_width=True) + + with col2: + # Top股票评分对比 + if len(top_stocks) > 0: + top_10 = top_stocks.head(10) + fig = px.bar( + top_10, + x='name', + y='total_score', + title="Top10股票评分对比", + labels={'name': '股票名称', 'total_score': '综合评分'}, + text='total_score' + ) + fig.update_traces(texttemplate='%{text:.1f}', textposition='outside') + st.plotly_chart(fig, use_container_width=True) + + # 各维度评分雷达图 + st.markdown("### 🎯 评分维度分析") + + display_score_radar(top_stocks) + + # 完整数据表格 + st.markdown("---") + st.subheader("📋 完整数据表格") + + # 选择要显示的列 + display_columns = [ + 'code', 'name', 'price', 'total_score', + 'circulation_market_cap', 'turnover_rate', + 'limit_time', 'broken_times', 'sector' + ] + + # 过滤存在的列 + available_columns = [col for col in display_columns if col in all_stocks.columns] + + # 重命名列名(中文) + column_rename = { + 'code': '代码', + 'name': '名称', + 'price': '股价', + 'total_score': '综合评分', + 'circulation_market_cap': '流通市值(亿)', + 'turnover_rate': '换手率(%)', + 'limit_time': '涨停时间', + 'broken_times': '炸板次数', + 'sector': '所属板块' + } + + display_df = all_stocks[available_columns].copy() + display_df = display_df.rename(columns=column_rename) + + # 格式化数值 + if '股价' in display_df.columns: + display_df['股价'] = display_df['股价'].apply(lambda x: f"{x:.2f}" if pd.notna(x) else "N/A") + + if '综合评分' in display_df.columns: + display_df['综合评分'] = display_df['综合评分'].apply(lambda x: f"{x:.1f}" if pd.notna(x) else "N/A") + + if '流通市值(亿)' in display_df.columns: + display_df['流通市值(亿)'] = display_df['流通市值(亿)'].apply(lambda x: f"{x:.2f}" if pd.notna(x) else "N/A") + + if '换手率(%)' in display_df.columns: + display_df['换手率(%)'] = display_df['换手率(%)'].apply(lambda x: f"{x:.2f}" if pd.notna(x) else "N/A") + + st.dataframe(display_df, use_container_width=True, height=400) + + # 下载按钮 + csv = all_stocks.to_csv(index=False, encoding='utf-8-sig') + st.download_button( + label="📥 下载完整数据(CSV)", + data=csv, + file_name=f"momentum_filter_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", + mime="text/csv" + ) + + +def display_stock_card(row: pd.Series): + """显示单个股票卡片""" + + code = row.get('code', 'N/A') + name = row.get('name', 'N/A') + price = row.get('price', 0) + total_score = row.get('total_score', 0) + + # 创建卡片 + with st.container(): + col1, col2, col3, col4 = st.columns([2, 2, 3, 3]) + + with col1: + st.markdown(f"### 【{name}】") + st.markdown(f"**代码**: {code}") + + with col2: + st.metric("股价", f"{price:.2f}元") + if 'circulation_market_cap' in row: + cap = row['circulation_market_cap'] + if pd.notna(cap): + st.metric("流通市值", f"{cap:.2f}亿") + + with col3: + st.metric("综合评分", f"{total_score:.1f}/100") + + # 评分等级 + if total_score >= 80: + grade = "🌟🌟🌟 优秀" + color = "green" + elif total_score >= 70: + grade = "⭐⭐ 良好" + color = "blue" + elif total_score >= 60: + grade = "⭐ 中等" + color = "orange" + else: + grade = "💫 一般" + color = "gray" + + st.markdown(f"**等级**: :{color}[{grade}]") + + with col4: + # 关键指标 + if 'limit_time' in row and pd.notna(row['limit_time']): + st.write(f"⏰ 涨停时间: {row['limit_time']}") + + if 'turnover_rate' in row and pd.notna(row['turnover_rate']): + st.write(f"🔄 换手率: {row['turnover_rate']:.2f}%") + + if 'broken_times' in row and pd.notna(row['broken_times']): + times = int(row['broken_times']) + st.write(f"💥 炸板次数: {times}次") + + if 'sector' in row and pd.notna(row['sector']): + st.write(f"📊 板块: {row['sector']}") + + # 评分详情 + with st.expander("📋 评分详情"): + score_cols = st.columns(5) + + score_items = [ + ('时间', 'time_score', 30), + ('封单', 'seal_score', 25), + ('炸板', 'broken_score', 20), + ('换手', 'turnover_score', 15), + ('市值', 'cap_score', 10) + ] + + for i, (label, key, max_score) in enumerate(score_items): + with score_cols[i]: + score = row.get(key, 0) + if pd.notna(score): + percentage = (score / max_score) * 100 + st.metric(label, f"{score:.1f}/{max_score}") + st.progress(percentage / 100) + + st.markdown("---") + + +def display_score_radar(stocks_df: pd.DataFrame): + """显示评分雷达图""" + + if stocks_df.empty: + return + + # 选择前5只股票 + top_5 = stocks_df.head(5) + + # 准备雷达图数据 + categories = ['涨停时间', '封单强度', '炸板次数', '换手率', '市值'] + score_columns = ['time_score', 'seal_score', 'broken_score', 'turnover_score', 'cap_score'] + max_scores = [30, 25, 20, 15, 10] + + fig = go.Figure() + + for idx, row in top_5.iterrows(): + name = row.get('name', 'N/A') + + # 计算百分比分数 + scores = [] + for col, max_score in zip(score_columns, max_scores): + score = row.get(col, 0) + if pd.notna(score): + percentage = (score / max_score) * 100 + scores.append(percentage) + else: + scores.append(0) + + fig.add_trace(go.Scatterpolar( + r=scores, + theta=categories, + fill='toself', + name=name + )) + + fig.update_layout( + polar=dict( + radialaxis=dict( + visible=True, + range=[0, 100] + ) + ), + showlegend=True, + title="Top5股票评分维度对比(百分比)" + ) + + st.plotly_chart(fig, use_container_width=True)