forked from devAmoghS/Machine-Learning-with-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
316 lines (221 loc) · 9.17 KB
/
Copy pathutils.py
File metadata and controls
316 lines (221 loc) · 9.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import math
import random
from collections import Counter, defaultdict
from functools import partial
import matplotlib.pyplot as plt
from helpers.linear_algebra import shape, get_column, make_matrix, vector_sum, dot, magnitude, vector_subtract, scalar_multiply
from helpers.probability import inverse_normal_cdf
from helpers.stats import correlation, standard_deviation, mean
from helpers.gradient_descent import maximize_stochastic, maximize_batch
def bucketize(point, bucket_size):
"""floor the point to the next lower multiple of bucket_size"""
return bucket_size * math.floor(point / bucket_size)
def make_histogram(points, bucket_size):
"""buckets the points and counts how many in each bucket"""
return Counter(bucketize(point, bucket_size) for point in points)
def plot_histogram(points, bucket_size, title=""):
histogram = make_histogram(points, bucket_size)
plt.bar(histogram.keys(), histogram.values(), width=bucket_size)
plt.title(title)
plt.show()
def compare_two_distributions():
random.seed(0)
uniform = [random.randrange(-100,101) for _ in range(200)]
normal = [57 * inverse_normal_cdf(random.random())
for _ in range(200)]
plot_histogram(uniform, 10, "Uniform Histogram")
plot_histogram(normal, 10, "Normal Histogram")
def random_normal():
"""returns a random draw from a standard normal distribution"""
return inverse_normal_cdf(random.random())
xs = [random_normal() for _ in range(1000)]
ys1 = [ x + random_normal() / 2 for x in xs]
ys2 = [-x + random_normal() / 2 for x in xs]
def scatter():
plt.scatter(xs, ys1, marker='.', color='black', label='ys1')
plt.scatter(xs, ys2, marker='.', color='gray', label='ys2')
plt.xlabel('xs')
plt.ylabel('ys')
plt.legend(loc=9)
plt.show()
def correlation_matrix(data):
"""returns the num_columns x num_columns matrix whose (i, j)th entry
is the correlation between columns i and j of data"""
_, num_columns = shape(data)
def matrix_entry(i, j):
return correlation(get_column(data, i), get_column(data, j))
return make_matrix(num_columns, num_columns, matrix_entry)
def make_scatterplot_matrix():
# first, generate some random data
num_points = 100
def random_row():
row = [None, None, None, None]
row[0] = random_normal()
row[1] = -5 * row[0] + random_normal()
row[2] = row[0] + row[1] + 5 * random_normal()
row[3] = 6 if row[2] > -2 else 0
return row
random.seed(0)
data = [random_row()
for _ in range(num_points)]
# then plot it
_, num_columns = shape(data)
fig, ax = plt.subplots(num_columns, num_columns)
for i in range(num_columns):
for j in range(num_columns):
# scatter column_j on the x-axis vs column_i on the y-axis
if i != j: ax[i][j].scatter(get_column(data, j), get_column(data, i))
# unless i == j, in which case show the series name
else: ax[i][j].annotate("series " + str(i), (0.5, 0.5),
xycoords='axes fraction',
ha="center", va="center")
# then hide axis labels except left and bottom charts
if i < num_columns - 1: ax[i][j].xaxis.set_visible(False)
if j > 0: ax[i][j].yaxis.set_visible(False)
# fix the bottom right and top left axis labels, which are wrong because
# their charts only have text in them
ax[-1][-1].set_xlim(ax[0][-1].get_xlim())
ax[0][0].set_ylim(ax[0][1].get_ylim())
plt.show()
def parse_row(input_row, parsers):
"""given a list of parsers (some of which may be None)
apply the appropriate one to each element of the input_row"""
return [parser(value) if parser is not None else value
for value, parser in zip(input_row, parsers)]
def parse_rows_with(reader, parsers):
"""wrap a reader to apply the parsers to each of its rows"""
for row in reader:
yield parse_row(row, parsers)
def try_or_none(f):
"""wraps f to return None if f raises an exception
assumes f takes only one input"""
def f_or_none(x):
try: return f(x)
except: return None
return f_or_none
def parse_row(input_row, parsers):
return [try_or_none(parser)(value) if parser is not None else value
for value, parser in zip(input_row, parsers)]
def try_parse_field(field_name, value, parser_dict):
"""try to parse value using the appropriate function from parser_dict"""
parser = parser_dict.get(field_name) # None if no such entry
if parser is not None:
return try_or_none(parser)(value)
else:
return value
def parse_dict(input_dict, parser_dict):
return { field_name : try_parse_field(field_name, value, parser_dict)
for field_name, value in input_dict.items() }
#
#
# MANIPULATING DATA
#
#
def picker(field_name):
"""returns a function that picks a field out of a dict"""
return lambda row: row[field_name]
def pluck(field_name, rows):
"""turn a list of dicts into the list of field_name values"""
return map(picker(field_name), rows)
def group_by(grouper, rows, value_transform=None):
# key is output of grouper, value is list of rows
grouped = defaultdict(list)
for row in rows:
grouped[grouper(row)].append(row)
if value_transform is None:
return grouped
else:
return { key : value_transform(rows)
for key, rows in grouped.items() }
def percent_price_change(yesterday, today):
return today["closing_price"] / yesterday["closing_price"] - 1
def day_over_day_changes(grouped_rows):
# sort the rows by date
ordered = sorted(grouped_rows, key=picker("date"))
# zip with an offset to get pairs of consecutive days
return [{ "symbol" : today["symbol"],
"date" : today["date"],
"change" : percent_price_change(yesterday, today) }
for yesterday, today in zip(ordered, ordered[1:])]
#
#
# RESCALING DATA
#
#
def scale(data_matrix):
num_rows, num_cols = shape(data_matrix)
means = [mean(get_column(data_matrix,j))
for j in range(num_cols)]
stdevs = [standard_deviation(get_column(data_matrix,j))
for j in range(num_cols)]
return means, stdevs
def rescale(data_matrix):
"""rescales the input data so that each column
has mean 0 and standard deviation 1
ignores columns with no deviation"""
means, stdevs = scale(data_matrix)
def rescaled(i, j):
if stdevs[j] > 0:
return (data_matrix[i][j] - means[j]) / stdevs[j]
else:
return data_matrix[i][j]
num_rows, num_cols = shape(data_matrix)
return make_matrix(num_rows, num_cols, rescaled)
def de_mean_matrix(A):
"""returns the result of subtracting from every value in A the mean
value of its column. the resulting matrix has mean 0 in every column"""
nr, nc = shape(A)
column_means, _ = scale(A)
return make_matrix(nr, nc, lambda i, j: A[i][j] - column_means[j])
def direction(w):
mag = magnitude(w)
return [w_i / mag for w_i in w]
def directional_variance_i(x_i, w):
"""the variance of the row x_i in the direction w"""
return dot(x_i, direction(w)) ** 2
def directional_variance(X, w):
"""the variance of the data in the direction w"""
return sum(directional_variance_i(x_i, w) for x_i in X)
def directional_variance_gradient_i(x_i, w):
"""the contribution of row x_i to the gradient of
the direction-w variance"""
projection_length = dot(x_i, direction(w))
return [2 * projection_length * x_ij for x_ij in x_i]
def directional_variance_gradient(X, w):
return vector_sum(directional_variance_gradient_i(x_i,w) for x_i in X)
def first_principal_component(X):
guess = [1 for _ in X[0]]
unscaled_maximizer = maximize_batch(
partial(directional_variance, X), # is now a function of w
partial(directional_variance_gradient, X), # is now a function of w
guess)
return direction(unscaled_maximizer)
def first_principal_component_sgd(X):
guess = [1 for _ in X[0]]
unscaled_maximizer = maximize_stochastic(
lambda x, _, w: directional_variance_i(x, w),
lambda x, _, w: directional_variance_gradient_i(x, w),
X, [None for _ in X], guess)
return direction(unscaled_maximizer)
def project(v, w):
"""return the projection of v onto w"""
coefficient = dot(v, w)
return scalar_multiply(coefficient, w)
def remove_projection_from_vector(v, w):
"""projects v onto w and subtracts the result from v"""
return vector_subtract(v, project(v, w))
def remove_projection(X, w):
"""for each row of X
projects the row onto w, and subtracts the result from the row"""
return [remove_projection_from_vector(x_i, w) for x_i in X]
def principal_component_analysis(X, num_components):
components = []
for _ in range(num_components):
component = first_principal_component(X)
components.append(component)
X = remove_projection(X, component)
return components
def transform_vector(v, components):
return [dot(v, w) for w in components]
def transform(X, components):
return [transform_vector(x_i, components) for x_i in X]