Compare commits

..

3 Commits

Author SHA1 Message Date
10d87aefd3 Add tree learners to strategy evaluation directory 2020-11-04 15:15:24 -05:00
05db89e8c2 Implement first version of strategy learner
This version does not pass the automatic test.
2020-11-04 15:14:27 -05:00
c40ffcf84b Show both MACD and indicator strat on figure
Prepare for strategy learner.
2020-11-04 09:23:42 -05:00
9 changed files with 634 additions and 435 deletions

View File

@@ -0,0 +1,77 @@
import numpy as np
class AbstractTreeLearner:
LEAF = -1
NA = -1
def author(self):
return 'felixm' # replace tb34 with your Georgia Tech username
def create_node(self, factor, split_value, left, right):
return np.array([(factor, split_value, left, right), ],
dtype='|i4, f4, i4, i4')
def query_point(self, point):
node_index = 0
while self.rel_tree[node_index][0] != self.LEAF:
node = self.rel_tree[node_index]
split_factor = node[0]
split_value = node[1]
if point[split_factor] <= split_value:
# Recurse into left sub-tree.
node_index += node[2]
else:
node_index += node[3]
v = self.rel_tree[node_index][1]
return v
def query(self, points):
"""
@summary: Estimate a set of test points given the model we built.
@param points: should be a numpy array with each row corresponding to a specific query.
@returns the estimated values according to the saved model.
"""
query_point = lambda p: self.query_point(p)
r = np.apply_along_axis(query_point, 1, points)
return r
def build_tree(self, xs, y):
"""
@summary: Build a decision tree from the training data.
@param dataX: X values of data to add
@param dataY: the Y training values
"""
assert(xs.shape[0] == y.shape[0])
assert(xs.shape[0] > 0) # If this is 0 something went wrong.
if xs.shape[0] <= self.leaf_size:
value = np.mean(y)
if value < -0.3:
value = -1
elif value > 0.3:
value = 1
else:
value = 0
return self.create_node(self.LEAF, value, self.NA, self.NA)
if np.all(y[0] == y):
return self.create_node(self.LEAF, y[0], self.NA, self.NA)
i, split_value = self.get_i_and_split_value(xs, y)
select_l = xs[:, i] <= split_value
select_r = xs[:, i] > split_value
lt = self.build_tree(xs[select_l], y[select_l])
rt = self.build_tree(xs[select_r], y[select_r])
root = self.create_node(i, split_value, 1, lt.shape[0] + 1)
root = np.concatenate([root, lt, rt])
return root
def addEvidence(self, data_x, data_y):
"""
@summary: Add training data to learner
@param dataX: X values of data to add
@param dataY: the Y training values
"""
self.rel_tree = self.build_tree(data_x, data_y)

View File

@@ -0,0 +1,47 @@
import numpy as np
from AbstractTreeLearner import AbstractTreeLearner
class BagLearner(AbstractTreeLearner):
def __init__(self, learner, bags=9, boost=False, verbose=False, kwargs={}):
self.learner = learner
self.verbose = verbose
self.bags = bags
self.learners = [learner(**kwargs) for _ in range(bags)]
def get_bag(self, data_x, data_y):
num_items = int(data_x.shape[0] * 0.5) # 50% of samples
bag_x, bag_y = [], []
for _ in range(num_items):
i = np.random.randint(0, data_x.shape[0])
bag_x.append(data_x[i,:])
bag_y.append(data_y[i])
return np.array(bag_x), np.array(bag_y)
def addEvidence(self, data_x, data_y):
"""
@summary: Add training data to learner
@param dataX: X values of data to add
@param dataY: the Y training values
"""
for learner in self.learners:
x, y = self.get_bag(data_x, data_y)
learner.addEvidence(x, y)
def query(self, points):
"""
@summary: Estimate a set of test points given the model we built.
@param points: numpy array with each row corresponding to a query.
@returns the estimated values according to the saved model.
"""
def to_discret(m):
print(m)
if m < -0.5:
return -1
elif m > 0.5:
return 1
return 0
m = np.mean([l.query(points) for l in self.learners], axis=0)
return m
# return np.apply_along_axis(to_discret, 1, m)

View File

@@ -36,18 +36,21 @@ class ManualStrategy:
print(volume) print(volume)
def macd_strat(self, macd, orders): def macd_strat(self, macd, orders):
"""Strategy based on MACD cross."""
def strat(ser): def strat(ser):
m = macd.loc[ser.index] m = macd.loc[ser.index]
prev_macd, prev_signal = m.iloc[0] prev_macd, prev_signal, _ = m.iloc[0]
cur_macd, cur_signal = m.iloc[1] cur_macd, cur_signal, _ = m.iloc[1]
shares = 0 shares = 0
if cur_macd < -1 and prev_macd < prev_signal and cur_macd > cur_signal: if cur_macd < -1 and prev_macd < prev_signal \
and cur_macd > cur_signal:
if self.holding == 0: if self.holding == 0:
shares = 1000 shares = 1000
elif self.holding == -1000: elif self.holding == -1000:
shares = 2000 shares = 2000
elif cur_macd > 1 and prev_macd > prev_signal and cur_macd < cur_signal: elif cur_macd > 1 and prev_macd > prev_signal \
and cur_macd < cur_signal:
if self.holding == 0: if self.holding == 0:
shares = -1000 shares = -1000
elif self.holding == 1000: elif self.holding == 1000:
@@ -58,6 +61,8 @@ class ManualStrategy:
orders['Shares'] = orders['Shares'].rolling(2).apply(strat) orders['Shares'] = orders['Shares'].rolling(2).apply(strat)
def three_indicator_strat(self, macd, rsi, price_sma, orders): def three_indicator_strat(self, macd, rsi, price_sma, orders):
"""Strategy based on three indicators. Thresholds selected based on
scatter plots."""
def strat(row): def strat(row):
shares = 0 shares = 0
_, _, macd_diff = macd.loc[row.name] _, _, macd_diff = macd.loc[row.name]
@@ -87,7 +92,7 @@ class ManualStrategy:
def testPolicy(self, symbol="IBM", def testPolicy(self, symbol="IBM",
sd=dt.datetime(2009, 1, 1), sd=dt.datetime(2009, 1, 1),
ed=dt.datetime(2010, 1, 1), ed=dt.datetime(2010, 1, 1),
sv=10000): sv=10000, macd_strat=False):
self.holding = 0 self.holding = 0
df = util.get_data([symbol], pd.date_range(sd, ed)) df = util.get_data([symbol], pd.date_range(sd, ed))
@@ -102,7 +107,8 @@ class ManualStrategy:
rsi = indicators.rsi(df, symbol) rsi = indicators.rsi(df, symbol)
price_sma = indicators.price_sma(df, symbol, [8]) price_sma = indicators.price_sma(df, symbol, [8])
# self.macd_strat(macd, orders) if macd_strat:
self.macd_strat(macd, orders)
else:
self.three_indicator_strat(macd, rsi, price_sma, orders) self.three_indicator_strat(macd, rsi, price_sma, orders)
return orders return orders

View File

@@ -0,0 +1,30 @@
import numpy as np
from AbstractTreeLearner import AbstractTreeLearner
class RTLearner(AbstractTreeLearner):
def __init__(self, leaf_size = 1, verbose = False):
self.leaf_size = leaf_size
self.verbose = verbose
def get_i_and_split_value(self, xs, y):
"""
@summary: Pick a random i and split value.
Make sure that not all X are the same for i and also pick
different values to average the split_value from.
"""
i = np.random.randint(0, xs.shape[1])
while np.all(xs[0,i] == xs[:,i]):
i = np.random.randint(0, xs.shape[1])
# I don't know about the performance of this, but at least it
# terminates reliably. If the two elements are the same something is
# wrong.
a = np.array(list(set(xs[:, i])))
r1, r2 = np.random.choice(a, size = 2, replace = False)
assert(r1 != r2)
split_value = (r1 + r2) / 2.0
return i, split_value

View File

@@ -1,88 +1,94 @@
"""
Template for implementing StrategyLearner (c) 2016 Tucker Balch
Copyright 2018, Georgia Institute of Technology (Georgia Tech)
Atlanta, Georgia 30332
All Rights Reserved
Template code for CS 4646/7646
Georgia Tech asserts copyright ownership of this template and all derivative
works, including solutions to the projects assigned in this course. Students
and other users of this template code are advised not to share it with others
or to make it available on publicly viewable websites including repositories
such as github and gitlab. This copyright statement should not be removed
or edited.
We do grant permission to share solutions privately with non-students such
as potential employers. However, sharing with other current or future
students of CS 7646 is prohibited and subject to being investigated as a
GT honor code violation.
-----do not edit anything above this line---
Student Name: Tucker Balch (replace with your name)
GT User ID: tb34 (replace with your User ID)
GT ID: 900897987 (replace with your GT ID)
"""
import datetime as dt import datetime as dt
import pandas as pd import pandas as pd
import util as ut import util
import indicators
from BagLearner import BagLearner
from RTLearner import RTLearner
class StrategyLearner(object): class StrategyLearner(object):
# constructor def __init__(self, verbose=False, impact=0.0, commission=0.0, testing=False):
def __init__(self, verbose = False, impact=0.0, commission=0.0):
self.verbose = verbose self.verbose = verbose
self.impact = impact self.impact = impact
self.commission = commission self.commission = commission
self.testing = testing
# this method should create a QLearner, and train it for trading def _get_volume(self):
def addEvidence(self, symbol = "IBM", \ """For reference."""
sd=dt.datetime(2008,1,1), \ volume_all = ut.get_data(syms, dates, colname="Volume")
ed=dt.datetime(2009,1,1), \
sv = 10000):
# add your code to do learning here
# example usage of the old backward compatible util function
syms=[symbol]
dates = pd.date_range(sd, ed)
prices_all = ut.get_data(syms, dates) # automatically adds SPY
prices = prices_all[syms] # only portfolio symbols
# prices_SPY = prices_all['SPY'] # only SPY, for comparison later
if self.verbose: print(prices)
# example use with new colname
volume_all = ut.get_data(syms, dates, colname = "Volume") # automatically adds SPY
volume = volume_all[syms] # only portfolio symbols volume = volume_all[syms] # only portfolio symbols
# volume_SPY = volume_all['SPY'] # only SPY, for comparison later # volume_SPY = volume_all['SPY'] # only SPY, for comparison later
if self.verbose: print(volume) if self.verbose:
print(volume)
# this method should use the existing policy and test it against new data def _add_indicators(self, df, symbol):
def testPolicy(self, symbol = "IBM", \ """Add indicators for learning to DataFrame."""
sd=dt.datetime(2009,1,1), \ df.drop(columns=["SPY"], inplace=True)
ed=dt.datetime(2010,1,1), \ indicators.macd(df, symbol)
indicators.rsi(df, symbol)
indicators.price_sma(df, symbol, [8])
indicators.price_delta(df, symbol, 3)
df.dropna(inplace=True)
def addEvidence(self, symbol="IBM",
sd=dt.datetime(2008, 1, 1),
ed=dt.datetime(2009, 1, 1),
sv=10000): sv=10000):
# here we build a fake set of trades self.indicators = ['macd_diff', 'rsi', 'price_sma_8']
# your code should return the same sort of data df = util.get_data([symbol], pd.date_range(sd, ed))
dates = pd.date_range(sd, ed) self._add_indicators(df, symbol)
prices_all = ut.get_data([symbol], dates) # automatically adds SPY
trades = prices_all[[symbol,]] # only portfolio symbols def classify_y(row):
# trades_SPY = prices_all['SPY'] # only SPY, for comparison later if row > 0.1:
trades.values[:,:] = 0 # set them all to nothing return 1
trades.values[0,:] = 1000 # add a BUY at the start elif row < -0.1:
trades.values[40,:] = -1000 # add a SELL return -1
trades.values[41,:] = 1000 # add a BUY return 0
trades.values[60,:] = -2000 # go short from long
trades.values[61,:] = 2000 # go long from short self.learner = RTLearner(leaf_size = 7)
trades.values[-1,:] = -1000 #exit on the last day # self.learner = BagLearner(RTLearner, 5, {'leaf_size': 5})
if self.verbose: print(type(trades)) # it better be a DataFrame! data_x = df[self.indicators].to_numpy()
if self.verbose: print(trades) y = df['pct_3'].apply(classify_y)
if self.verbose: print(prices_all) self.learner.addEvidence(data_x, y.to_numpy())
return trades return y
def strat(self, data_y, orders):
self.holding = 0
def strat(row):
y = int(data_y.loc[row.name][0])
shares = 0
if self.holding == 0 and y == 1:
shares = 1000
elif self.holding == -1000 and y == 1:
shares = 2000
elif self.holding == 0 and y == -1:
shares = -1000
elif self.holding == 1000 and y == -1:
shares = -2000
self.holding += shares
return shares
orders["Shares"] = orders.apply(strat, axis=1)
def testPolicy(self, symbol="IBM",
sd=dt.datetime(2009, 1, 1),
ed=dt.datetime(2010, 1, 1),
sv=10000):
df = util.get_data([symbol], pd.date_range(sd, ed))
self._add_indicators(df, symbol)
data_x = df[self.indicators].to_numpy()
data_y = pd.DataFrame(index=df.index, data=self.learner.query(data_x))
orders = pd.DataFrame(index=df.index)
orders["Symbol"] = symbol
orders["Order"] = ""
orders["Shares"] = 0
self.strat(data_y, orders)
if self.testing:
return orders
else:
return orders[["Shares"]]
if __name__=="__main__":
print("One does not simply think up a strategy")

View File

@@ -9,6 +9,7 @@ import matplotlib.pyplot as plt
from matplotlib.widgets import MultiCursor from matplotlib.widgets import MultiCursor
from BenchmarkStrategy import BenchmarkStrategy from BenchmarkStrategy import BenchmarkStrategy
from ManualStrategy import ManualStrategy from ManualStrategy import ManualStrategy
from StrategyLearner import StrategyLearner
def plot_indicators(symbol, df): def plot_indicators(symbol, df):
@@ -16,7 +17,6 @@ def plot_indicators(symbol, df):
price_sma = indicators.price_sma(df, symbol, [8]) price_sma = indicators.price_sma(df, symbol, [8])
bb = indicators.bollinger_band(df, symbol) bb = indicators.bollinger_band(df, symbol)
sma = indicators.sma(df, symbol, [8])
rsi = indicators.rsi(df, symbol) rsi = indicators.rsi(df, symbol)
macd = indicators.macd(df, symbol).copy() macd = indicators.macd(df, symbol).copy()
@@ -57,41 +57,81 @@ def visualize_correlations(symbol, df):
sys.exit(0) sys.exit(0)
def experiment1(): def compare_manual_strategies(symbol, sv, sd, ed):
symbol = "JPM"
start_value = 10000
sd = dt.datetime(2008, 1, 1) # in-sample
ed = dt.datetime(2009, 12, 31) # in-sample
# sd = dt.datetime(2010, 1, 1) # out-sample
# ed = dt.datetime(2011, 12, 31) # out-sample
df = util.get_data([symbol], pd.date_range(sd, ed)) df = util.get_data([symbol], pd.date_range(sd, ed))
df.drop(columns=["SPY"], inplace=True) df.drop(columns=["SPY"], inplace=True)
# visualize_correlations(symbol, df)
# plot_indicators(symbol, df)
bs = BenchmarkStrategy() bs = BenchmarkStrategy()
orders = bs.testPolicy(symbol, sd, ed, start_value) orders = bs.testPolicy(symbol, sd, ed, sv)
df["Benchmark"] = marketsim.compute_portvals(orders, start_value) df["Benchmark"] = marketsim.compute_portvals(orders, sv)
df["Orders Benchmark"] = orders["Shares"] df["Orders Benchmark"] = orders["Shares"]
ms = ManualStrategy() ms = ManualStrategy()
orders = ms.testPolicy(symbol, sd, ed, start_value) orders = ms.testPolicy(symbol, sd, ed, sv, macd_strat=True)
df["Manual"] = marketsim.compute_portvals(orders, start_value) df["MACD Strat"] = marketsim.compute_portvals(orders, sv)
df["Orders Manual"] = orders["Shares"] df["Orders MACD"] = orders["Shares"]
df["Holding Manual"] = orders["Shares"].cumsum() # df["Holding Manual"] = orders["Shares"].cumsum()
orders = ms.testPolicy(symbol, sd, ed, sv)
df["Three Strat"] = marketsim.compute_portvals(orders, sv)
df["Orders Three"] = orders["Shares"]
fig, ax = plt.subplots(3, sharex=True) fig, ax = plt.subplots(3, sharex=True)
df[[symbol]].plot(ax=ax[0]) df[[symbol]].plot(ax=ax[0])
df[["Benchmark", "Manual"]].plot(ax=ax[1]) df[["Benchmark", "MACD Strat", "Three Strat"]].plot(ax=ax[1])
df[["Orders Benchmark", "Orders Manual"]].plot(ax=ax[2]) df[["Orders Benchmark", "Orders MACD", "Orders Three"]].plot(ax=ax[2])
for a in ax: for a in ax:
a.grid() a.grid()
multi = MultiCursor(fig.canvas, ax, color='r', lw=0.5) MultiCursor(fig.canvas, ax, color='r', lw=0.5)
# plt.show()
fig.set_size_inches(10, 8, forward=True)
plt.savefig('figure_1.png', dpi=fig.dpi)
def experiment1():
symbol = "JPM"
sv = 10000
sd = dt.datetime(2008, 1, 1) # in-sample
ed = dt.datetime(2009, 12, 31) # in-sample
sd_out = dt.datetime(2010, 1, 1) # out-sample
ed_out = dt.datetime(2011, 12, 31) # out-sample
df = util.get_data([symbol], pd.date_range(sd, ed_out))
df.drop(columns=["SPY"], inplace=True)
# visualize_correlations(symbol, df)
# plot_indicators(symbol, df)
# compare_manual_strategies(symbol, sv, sd, ed)
bs = BenchmarkStrategy()
orders = bs.testPolicy(symbol, sd_out, ed_out, sv)
df["Benchmark"] = marketsim.compute_portvals(orders, sv)
df["Orders Benchmark"] = orders["Shares"]
sl = StrategyLearner(testing=True)
sl.addEvidence(symbol, sd, ed, sv)
orders = sl.testPolicy(symbol, sd_out, ed_out, sv)
df["SL"] = marketsim.compute_portvals(orders, sv)
df["Orders SL"] = orders["Shares"]
fig, ax = plt.subplots(3, sharex=True)
df[[symbol]].plot(ax=ax[0])
df[["Benchmark", "SL"]].plot(ax=ax[1])
df[["Orders Benchmark", "Orders SL"]].plot(ax=ax[2])
for a in ax:
a.grid()
MultiCursor(fig.canvas, ax, color='r', lw=0.5)
plt.show() plt.show()
# plt.savefig('figure_1.png')
# For debugging the classification learner:
# df["y_train"] = sl.addEvidence(symbol, sd, ed, sv)
# df["y_query"] = sl.testPolicy(symbol, sd, ed, sv)
# df[["y_train", "y_query"]].plot(ax=ax[1])
if __name__ == "__main__": if __name__ == "__main__":

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

After

Width:  |  Height:  |  Size: 112 KiB

View File

@@ -73,7 +73,7 @@ def rsi(df, symbol, period=14):
(avg_loss / period)))) (avg_loss / period))))
return rsi return rsi
key = f"rsi" key = "rsi"
# Add one to get 'period' price changes (first change is nan). # Add one to get 'period' price changes (first change is nan).
period += 1 period += 1
df[key] = df[symbol].rolling(period).apply(rsi) df[key] = df[symbol].rolling(period).apply(rsi)
@@ -91,13 +91,6 @@ def macd(df, symbol):
return df[[k1, k2, k3]] return df[[k1, k2, k3]]
def price_delta(df, symbol, period=1):
"""Calculate delta between previous day and today."""
k = f"diff_{period}"
df[k] = df[symbol].diff(periods=period)
return df[k]
def price_delta(df, symbol, period=1): def price_delta(df, symbol, period=1):
"""Calculate percentage change for period.""" """Calculate percentage change for period."""
k = f"pct_{period}" k = f"pct_{period}"