Ashare 学习教程
Ashare 学习教程
目录
1. 环境准备
1.1 系统要求
- Python 3.6+
- 操作系统: Windows / macOS / Linux
1.2 安装依赖
# 安装 pandas
pip install pandas
# 安装 requests
pip install requests
1.3 获取 Ashare
方式一:下载单文件(推荐)
# 克隆仓库
git clone https://github.com/mpquant/Ashare.git
# 或直接下载 Ashare.py 文件
# https://raw.githubusercontent.com/mpquant/Ashare/master/Ashare.py
方式二:复制代码
直接将 Ashare.py 文件复制到你的项目目录中即可使用。
1.4 验证安装
# 测试导入
from Ashare import get_price
# 测试获取数据
df = get_price('sh000001', count=5)
print(df)
# 如果输出类似以下内容,说明安装成功
# open close high low volume amount
# date
# 2024-03-15 3045.12 3085.67 3090.45 3040.23 45678900000 51234567890
2. 快速开始
2.1 第一个程序
from Ashare import *
# 获取上证指数最近10天的日线数据
df = get_price('sh000001', frequency='1d', count=10)
# 打印数据
print("上证指数最近10天行情:")
print(df)
# 打印最新收盘价
print(f"\n最新收盘价: {df['close'].iloc[-1]:.2f}")
2.2 获取股票数据
from Ashare import *
# 获取贵州茅台最近5天数据
df = get_price('sh600519', frequency='1d', count=5)
print("贵州茅台行情:")
print(df)
# 计算涨跌幅
last_close = df['close'].iloc[-1]
prev_close = df['close'].iloc[-2]
change_pct = (last_close - prev_close) / prev_close * 100
print(f"涨跌幅: {change_pct:.2f}%")
2.3 获取分钟数据
from Ashare import *
# 获取5分钟线数据
df = get_price('sh600519', frequency='5m', count=20)
print("5分钟K线数据:")
print(df)
3. 核心概念
3.1 股票代码格式
Ashare 支持多种股票代码格式:
| 格式 | 示例 | 说明 |
|---|---|---|
| 通达信 | sh600519 |
上证股票前缀 sh |
| 通达信 | sz000001 |
深证股票前缀 sz |
| 聚宽 | 600519.XSHG |
上证股票后缀 XSHG |
| 聚宽 | 000001.XSHE |
深证股票后缀 XSHE |
常用代码速查:
# === 指数 ===
'sh000001' # 上证指数
'sz399001' # 深证成指
'sz399006' # 创业板指
'sh000688' # 科创50
'sh000300' # 沪深300
# === 热门股票 ===
'sh600519' # 贵州茅台
'sz000001' # 平安银行
'sh601318' # 中国平安
'sz000858' # 五粮液
'sh600036' # 招商银行
'sz002594' # 比亚迪
'sh601012' # 隆基绿能
'sz300750' # 宁德时代
3.2 数据频率
| 频率 | 代码 | 说明 | 典型用途 |
|---|---|---|---|
| 1分钟 | 1m |
1分钟K线 | 日内交易 |
| 5分钟 | 5m |
5分钟K线 | 短线交易 |
| 15分钟 | 15m |
15分钟K线 | 短线交易 |
| 30分钟 | 30m |
30分钟K线 | 波段分析 |
| 60分钟 | 60m |
60分钟K线 | 波段分析 |
| 日线 | 1d |
日K线 | 中长线分析 |
| 周线 | 1w |
周K线 | 中长线分析 |
| 月线 | 1M |
月K线 | 长线分析 |
3.3 数据字段
返回的 DataFrame 包含标准 OHLCVA 数据:
┌────────┬──────┬───────┬──────┬─────┬────────┬────────┐
│ 日期 │ 开盘 │ 收盘 │ 最高 │ 最低│ 成交量 │ 成交额 │
├────────┼──────┼───────┼──────┼─────┼────────┼────────┤
│ date │ open │ close │ high │ low │ volume │ amount │
└────────┴──────┴───────┴──────┴─────┴────────┴────────┘
3.4 双数据源架构
┌─────────────┐
│ Ashare │
└──────┬──────┘
│
▼
┌──────────────────────────────────────┐
│ 数据源选择 │
└──────┬─────────────────────┬─────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 新浪财经 │ │ 腾讯股票 │
│ (主数据源) │ │ (备用源) │
└─────────────┘ └─────────────┘
│ │
└─────────┬───────────┘
│
▼
┌─────────────┐
│ pandas Data │
└─────────────┘
4. API 详解
4.1 get_price 函数
def get_price(code, end_date='', count=10, frequency='1d', fields=[]):
"""
获取股票历史行情数据
参数:
--------
code : str
股票代码,支持多种格式
- 'sh600519' (通达信格式)
- '000001.XSHG' (聚宽格式)
end_date : str, 可选
结束日期,默认为当前日期
- '2024-01-15' 或 '20240115'
count : int, 默认 10
获取的数据条数
frequency : str, 默认 '1d'
数据频率
- '1m', '5m', '15m', '30m', '60m' (分钟线)
- '1d' (日线)
- '1w' (周线)
- '1M' (月线)
fields : list, 可选
指定返回的字段列表
- 默认返回所有字段
- 可选: ['open', 'close', 'high', 'low', 'volume', 'amount']
返回:
--------
pandas.DataFrame
包含行情数据的数据框,索引为日期
"""
4.2 参数详解
code 参数
from Ashare import *
# 通达信格式(推荐)
df = get_price('sh600519') # 上证股票
df = get_price('sz000001') # 深证股票
# 聚宽格式
df = get_price('600519.XSHG') # 上证股票
df = get_price('000001.XSHE') # 深证股票
# 指数
df = get_price('sh000001') # 上证指数
df = get_price('sz399006') # 创业板指
end_date 参数
from Ashare import *
# 不指定:获取到最新日期
df = get_price('sh000001', count=10)
# 指定结束日期
df = get_price('sh000001', end_date='2024-01-15', count=10)
df = get_price('sh000001', end_date='20240115', count=10) # 紧凑格式
# 获取历史某段时间的数据
df = get_price('sh600519', end_date='2023-12-31', count=250)
count 参数
from Ashare import *
# 获取最近5条
df = get_price('sh000001', count=5)
# 获取最近100条(约5个月日线)
df = get_price('sh000001', count=100)
# 获取最近250条(约一年交易日)
df = get_price('sh000001', count=250)
# 分钟数据建议不要太多
df = get_price('sh000001', frequency='5m', count=200)
frequency 参数
from Ashare import *
# 日线(默认)
df_day = get_price('sh600519', frequency='1d', count=20)
# 分钟线
df_1m = get_price('sh600519', frequency='1m', count=100)
df_5m = get_price('sh600519', frequency='5m', count=100)
df_15m = get_price('sh600519', frequency='15m', count=50)
df_30m = get_price('sh600519', frequency='30m', count=30)
df_60m = get_price('sh600519', frequency='60m', count=20)
# 周线、月线
df_week = get_price('sh600519', frequency='1w', count=52)
df_month = get_price('sh600519', frequency='1M', count=24)
fields 参数
from Ashare import *
# 默认返回所有字段
df = get_price('sh600519', count=5)
print(df.columns) # ['open', 'close', 'high', 'low', 'volume', 'amount']
# 只获取需要的字段
df = get_price('sh600519', count=5, fields=['open', 'close'])
print(df.columns) # ['open', 'close']
# 只获取收盘价
df = get_price('sh600519', count=10, fields=['close'])
print(df)
5. 实战示例
5.1 获取股票基本信息
from Ashare import *
def get_stock_info(code):
"""获取股票最新信息"""
df = get_price(code, frequency='1d', count=2)
if df is None or len(df) < 2:
return None
today = df.iloc[-1]
yesterday = df.iloc[-2]
info = {
'code': code,
'open': today['open'],
'close': today['close'],
'high': today['high'],
'low': today['low'],
'volume': today['volume'],
'amount': today['amount'],
'change': today['close'] - yesterday['close'],
'change_pct': (today['close'] - yesterday['close']) / yesterday['close'] * 100,
'amplitude': (today['high'] - today['low']) / yesterday['close'] * 100
}
return info
# 使用示例
info = get_stock_info('sh600519')
print(f"股票代码: {info['code']}")
print(f"最新价: {info['close']:.2f}")
print(f"涨跌幅: {info['change_pct']:.2f}%")
print(f"振幅: {info['amplitude']:.2f}%")
5.2 计算技术指标
from Ashare import *
import pandas as pd
import numpy as np
def calculate_indicators(code, count=100):
"""计算常用技术指标"""
df = get_price(code, frequency='1d', count=count)
if df is None or len(df) < 30:
return None
# 移动平均线
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA10'] = df['close'].rolling(window=10).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA60'] = df['close'].rolling(window=60).mean()
# MACD
exp12 = df['close'].ewm(span=12, adjust=False).mean()
exp26 = df['close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp12 - exp26
df['Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
df['Histogram'] = df['MACD'] - df['Signal']
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# 布林带
df['BOLL_MID'] = df['close'].rolling(window=20).mean()
df['BOLL_STD'] = df['close'].rolling(window=20).std()
df['BOLL_UPPER'] = df['BOLL_MID'] + 2 * df['BOLL_STD']
df['BOLL_LOWER'] = df['BOLL_MID'] - 2 * df['BOLL_STD']
return df
# 使用示例
df = calculate_indicators('sh600519', count=100)
print(df[['close', 'MA5', 'MA20', 'MACD', 'RSI']].tail(10))
5.3 简单策略回测
from Ashare import *
import pandas as pd
def simple_ma_strategy(code, short_window=5, long_window=20, count=100):
"""简单的双均线策略"""
df = get_price(code, frequency='1d', count=count)
if df is None or len(df) < long_window:
return None
# 计算均线
df['MA_short'] = df['close'].rolling(window=short_window).mean()
df['MA_long'] = df['close'].rolling(window=long_window).mean()
# 生成信号
df['signal'] = 0
df.loc[df['MA_short'] > df['MA_long'], 'signal'] = 1
df.loc[df['MA_short'] < df['MA_long'], 'signal'] = -1
# 计算收益
df['returns'] = df['close'].pct_change()
df['strategy_returns'] = df['signal'].shift(1) * df['returns']
# 统计
total_return = (1 + df['strategy_returns'].dropna()).cumprod().iloc[-1] - 1
buy_hold_return = (1 + df['returns'].dropna()).cumprod().iloc[-1] - 1
return {
'code': code,
'total_trades': (df['signal'].diff() != 0).sum(),
'strategy_return': total_return * 100,
'buy_hold_return': buy_hold_return * 100,
'data': df
}
# 使用示例
result = simple_ma_strategy('sh600519', short_window=5, long_window=20, count=100)
print(f"股票: {result['code']}")
print(f"交易次数: {result['total_trades']}")
print(f"策略收益: {result['strategy_return']:.2f}%")
print(f"买入持有收益: {result['buy_hold_return']:.2f}%")
5.4 多股票对比分析
from Ashare import *
import pandas as pd
import time
def compare_stocks(codes, count=30):
"""多股票对比分析"""
data = {}
for code in codes:
df = get_price(code, frequency='1d', count=count)
if df is not None and len(df) > 0:
data[code] = df
time.sleep(0.1)
# 计算各股票指标
results = []
for code, df in data.items():
first_close = df['close'].iloc[0]
last_close = df['close'].iloc[-1]
max_price = df['high'].max()
min_price = df['low'].min()
results.append({
'code': code,
'start_price': first_close,
'end_price': last_close,
'return_pct': (last_close - first_close) / first_close * 100,
'max_price': max_price,
'min_price': min_price,
'amplitude': (max_price - min_price) / first_close * 100,
'avg_volume': df['volume'].mean()
})
return pd.DataFrame(results)
# 使用示例
stocks = ['sh600519', 'sz000858', 'sh601318', 'sz000001', 'sh600036']
df_compare = compare_stocks(stocks, count=30)
print(df_compare.sort_values('return_pct', ascending=False))
5.5 实时监控脚本
from Ashare import *
import time
from datetime import datetime
import os
def clear_screen():
"""清屏"""
os.system('cls' if os.name == 'nt' else 'clear')
def monitor_portfolio(codes, interval=10):
"""监控股票组合"""
while True:
try:
clear_screen()
print(f"股票监控 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
print(f"{'代码':<12} {'最新价':>10} {'涨跌幅':>10} {'成交量':>15} {'成交额':>15}")
print("-" * 70)
for code in codes:
try:
df = get_price(code, frequency='1d', count=2)
if df is not None and len(df) >= 2:
today = df.iloc[-1]
yesterday = df.iloc[-2]
change_pct = (today['close'] - yesterday['close']) / yesterday['close'] * 100
# 涨跌颜色标记
arrow = '↑' if change_pct > 0 else '↓' if change_pct < 0 else '-'
print(f"{code:<12} {today['close']:>10.2f} {change_pct:>9.2f}% {arrow} "
f"{today['volume']:>15,.0f} {today['amount']:>15,.0f}")
else:
print(f"{code:<12} {'获取失败':>10}")
except Exception as e:
print(f"{code:<12} 错误: {e}")
time.sleep(0.1) # 避免请求过快
print("=" * 70)
print(f"刷新间隔: {interval}秒 | 按 Ctrl+C 退出")
except KeyboardInterrupt:
print("\n监控已停止")
break
except Exception as e:
print(f"错误: {e}")
time.sleep(interval)
# 使用示例
if __name__ == '__main__':
portfolio = ['sh600519', 'sz000858', 'sh601318', 'sz000001', 'sh600036',
'sz002594', 'sh601012', 'sz300750']
monitor_portfolio(portfolio, interval=10)
6. 进阶应用
6.1 与 pandas-ta 结合
from Ashare import get_price
import pandas_ta as ta
# 获取数据
df = get_price('sh600519', frequency='1d', count=100)
# 使用 pandas-ta 计算指标
df.ta.sma(length=20, append=True) # 简单移动平均
df.ta.ema(length=20, append=True) # 指数移动平均
df.ta.rsi(length=14, append=True) # RSI
df.ta.macd(append=True) # MACD
df.ta.bbands(append=True) # 布林带
print(df.tail())
6.2 数据持久化
from Ashare import get_price
import pandas as pd
import os
from datetime import datetime
class StockDataCache:
"""股票数据缓存类"""
def __init__(self, cache_dir='./stock_cache'):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def get(self, code, frequency='1d', count=100, force_refresh=False):
"""获取数据(带缓存)"""
cache_file = os.path.join(self.cache_dir, f"{code}_{frequency}.pkl")
# 检查缓存
if not force_refresh and os.path.exists(cache_file):
cache_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
# 日线数据当天有效
if cache_time.date() == datetime.now().date():
return pd.read_pickle(cache_file)
# 获取新数据
df = get_price(code, frequency=frequency, count=count)
if df is not None:
df.to_pickle(cache_file)
return df
def clear_cache(self):
"""清除缓存"""
import shutil
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir)
# 使用示例
cache = StockDataCache()
df = cache.get('sh600519', frequency='1d', count=100)
print(f"数据行数: {len(df)}")
6.3 异步批量获取
import asyncio
import aiohttp
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from Ashare import get_price
async def async_get_price(code, frequency='1d', count=10):
"""异步获取单只股票数据"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
df = await loop.run_in_executor(
executor,
get_price,
code, '', count, frequency
)
return code, df
async def batch_get_prices(codes, frequency='1d', count=10):
"""异步批量获取"""
tasks = [async_get_price(code, frequency, count) for code in codes]
results = await asyncio.gather(*tasks)
return dict(results)
# 使用示例
async def main():
stocks = ['sh600519', 'sz000858', 'sh601318', 'sz000001', 'sh600036']
data = await batch_get_prices(stocks, frequency='1d', count=10)
for code, df in data.items():
if df is not None:
print(f"{code}: {len(df)} 条数据, 最新价 {df['close'].iloc[-1]:.2f}")
# 运行
# asyncio.run(main())
6.4 与 matplotlib 可视化
from Ashare import get_price
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei']
plt.rcParams['axes.unicode_minus'] = False
def plot_kline(code, count=60):
"""绘制K线图"""
df = get_price(code, frequency='1d', count=count)
if df is None:
return
# 创建图形
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8),
gridspec_kw={'height_ratios': [3, 1]})
# 绘制K线
for i, (idx, row) in enumerate(df.iterrows()):
color = 'red' if row['close'] >= row['open'] else 'green'
# 绘制实体
ax1.bar(i, row['close'] - row['open'], bottom=row['open'], color=color, width=0.6)
# 绘制上下影线
ax1.vlines(i, row['low'], row['high'], color=color, linewidth=1)
# 设置标题和标签
ax1.set_title(f'{code} K线图', fontsize=14)
ax1.set_ylabel('价格')
ax1.grid(True, alpha=0.3)
# 绘制成交量
colors = ['red' if df['close'].iloc[i] >= df['open'].iloc[i] else 'green'
for i in range(len(df))]
ax2.bar(range(len(df)), df['volume'], color=colors, width=0.6)
ax2.set_ylabel('成交量')
ax2.grid(True, alpha=0.3)
# 设置x轴
ax2.set_xticks(range(0, len(df), 10))
ax2.set_xticklabels([df.index[i] for i in range(0, len(df), 10)], rotation=45)
plt.tight_layout()
plt.show()
# 使用示例
# plot_kline('sh600519', count=60)
7. 常见问题
Q1: 获取数据时返回空或 None
原因: - 股票代码格式不正确 - 网络连接问题 - 数据源暂时不可用
解决方案:
from Ashare import get_price
# 1. 检查代码格式
df = get_price('sh600519') # 正确 ✓
# df = get_price('600519') # 可能有问题 ✗
# 2. 添加异常处理
try:
df = get_price('sh600519', count=10)
if df is None or len(df) == 0:
print("未获取到数据,请检查代码或稍后重试")
except Exception as e:
print(f"错误: {e}")
Q2: 分钟数据获取不完整
原因: 数据源对分钟数据有限制
解决方案:
# 分批获取
from Ashare import get_price
import pandas as pd
def get_full_minute_data(code, frequency='5m', total_count=500):
batch_size = 200
all_data = []
# 分钟数据通常只能获取最近的几百条
df = get_price(code, frequency=frequency, count=min(batch_size, total_count))
return df
Q3: 如何获取实时涨跌幅
from Ashare import get_price
def get_realtime_change(code):
"""获取实时涨跌幅"""
df = get_price(code, frequency='1d', count=2)
if df is None or len(df) < 2:
return None
today = df.iloc[-1]
yesterday = df.iloc[-2]
change = today['close'] - yesterday['close']
change_pct = change / yesterday['close'] * 100
return {
'code': code,
'price': today['close'],
'change': change,
'change_pct': change_pct
}
# 使用
info = get_realtime_change('sh600519')
print(f"价格: {info['price']}, 涨跌: {info['change_pct']:.2f}%")
Q4: 数据有延迟怎么办
说明: 免费数据源存在 1-3 分钟延迟
解决方案: - 对于日内交易,可结合其他数据源 - 对于中长线策略,延迟影响较小
Q5: 如何获取更多字段
Ashare 仅提供 OHLCVA 基础数据。如需更多字段:
# 使用 AKShare 获取更多数据
import akshare as ak
# 实时行情(含市盈率、市净率等)
df = ak.stock_zh_a_spot_em()
stock_info = df[df['代码'] == '600519']
print(stock_info)
8. 最佳实践
8.1 错误处理
from Ashare import get_price
import time
def safe_get_price(code, max_retries=3, **kwargs):
"""安全的获取数据"""
for attempt in range(max_retries):
try:
df = get_price(code, **kwargs)
if df is not None and len(df) > 0:
return df
except Exception as e:
print(f"尝试 {attempt + 1}/{max_retries} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
return None
# 使用
df = safe_get_price('sh600519', count=10, frequency='1d')
if df is not None:
print("获取成功")
else:
print("获取失败")
8.2 请求频率控制
import time
from Ashare import get_price
def batch_get_with_delay(codes, delay=0.2, **kwargs):
"""带延迟的批量获取"""
results = {}
for code in codes:
df = get_price(code, **kwargs)
results[code] = df
time.sleep(delay) # 控制请求频率
return results
8.3 数据验证
def validate_stock_data(df):
"""验证数据有效性"""
if df is None or len(df) == 0:
return False, "数据为空"
required = ['open', 'close', 'high', 'low', 'volume']
for col in required:
if col not in df.columns:
return False, f"缺少字段: {col}"
if (df['high'] < df['low']).any():
return False, "最高价低于最低价的异常"
if (df['close'] > df['high']).any() or (df['close'] < df['low']).any():
return False, "收盘价超出最高最低价范围"
return True, "数据有效"
8.4 日志记录
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='stock_data.log'
)
logger = logging.getLogger(__name__)
def logged_get_price(code, **kwargs):
"""带日志的数据获取"""
logger.info(f"获取数据: {code}, 参数: {kwargs}")
try:
df = get_price(code, **kwargs)
if df is not None:
logger.info(f"获取成功: {code}, {len(df)} 条数据")
else:
logger.warning(f"获取失败: {code}, 返回空数据")
return df
except Exception as e:
logger.error(f"获取异常: {code}, 错误: {e}")
return None
9. 附录
9.1 常用指数代码
| 指数名称 | 代码 |
|---|---|
| 上证指数 | sh000001 |
| 深证成指 | sz399001 |
| 创业板指 | sz399006 |
| 科创50 | sh000688 |
| 沪深300 | sh000300 |
| 上证50 | sh000016 |
| 中证500 | sh000905 |
| 中证1000 | sh000852 |
9.2 常用股票代码
| 股票名称 | 代码 |
|---|---|
| 贵州茅台 | sh600519 |
| 平安银行 | sz000001 |
| 中国平安 | sh601318 |
| 五粮液 | sz000858 |
| 招商银行 | sh600036 |
| 比亚迪 | sz002594 |
| 宁德时代 | sz300750 |
| 中国中免 | sh601888 |
9.3 参考资源
- GitHub 仓库: https://github.com/mpquant/Ashare
- AKShare 文档: https://akshare.akfamily.xyz/
- Tushare 文档: https://tushare.pro/document/2
- Pandas 文档: https://pandas.pydata.org/docs/
教程更新时间: 2026-03-20