-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrun_spark.py
More file actions
128 lines (104 loc) · 5.26 KB
/
run_spark.py
File metadata and controls
128 lines (104 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import csv
import argparse
from datetime import datetime
import sys
from pyspark import SparkContext
import os
import logging.config
from core.TimeSeries import TimeSeries
from core.Experiment import Experiment
from forecasting_models.DummyPrevious import DummyPrevious
# from forecasting_models.Arima import Arima
from forecasting_models.AutoArima import AutoArima
from forecasting_models.ExpSmoothing import ExpSmoothing
# from forecasting_models.FBProphet import FBProphet
from forecasting_models.GPRegression import GPRegression
from forecasting_models.GradientBoostingDirective import GradientBoostingDirective
from forecasting_models.GradientBoostingRecursive import GradientBoostingRecursive
from forecasting_models.Lstm_Keras import Lstm_Keras
from forecasting_models.NeuralNetwork import NeuralNetwork
from forecasting_models.RandomForestDirective import RandomForestDirective
from forecasting_models.RandomForestRecursive import RandomForestRecursive
from forecasting_models.Revarb import Revarb
from forecasting_models.SvrDirective import SvrDirective
from forecasting_models.SvrRecursive import SvrRecursive
from utils.utils import set_csv_field_size_limit
MAXIMUM_OBSERVATIONS = 400
HISTORY_SIZE = 10
def run_experiment(_v):
return _v[1].name, Experiment(model=_v[1], time_series=[_v[0]], csv_writing=False).run()
if __name__ == '__main__':
script_dir = os.path.dirname(os.path.realpath(__file__))
logging.config.fileConfig(os.path.join(script_dir, "logging.conf"))
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description="Experiment different TimeSeries Forecasting Models.")
parser.add_argument(action='store', nargs='+', dest='input_trace_files', help='Input Trace File')
parser.add_argument("-p", "--parallelism", type=int, default=1,
action='store', dest='parallelism', help='Parallelism')
args = parser.parse_args()
set_csv_field_size_limit()
tss = []
for input_trace_file in args.input_trace_files:
input_file_path = os.path.abspath(input_trace_file)
logger.info("Loading file: {}".format(input_file_path))
with open(input_file_path) as csvfile:
reader = csv.DictReader(csvfile)
if "values" not in reader.fieldnames:
RuntimeError("No columns named 'values' inside the provided csv!")
sys.exit(-1)
for row in reader:
observations = [float(x) for x in row.pop("values").split(" ")]
if MAXIMUM_OBSERVATIONS > 0:
observations = observations[:MAXIMUM_OBSERVATIONS]
if HISTORY_SIZE is not None and HISTORY_SIZE > 0:
row["minimum_observations"] = HISTORY_SIZE
ts = TimeSeries(observations=observations, **row)
# discards TS that does not have enough observations
if len(ts.observations) <= ts.minimum_observations:
logger.warning("This TS has fewer points ({}) compared with the minimum observations {}"
.format(len(ts.observations), ts.minimum_observations))
else:
tss.append(ts)
logger.info("Loaded {} TimeSeries".format(len(tss)))
if len(tss) > 0:
models_to_test = [
DummyPrevious(),
AutoArima(),
ExpSmoothing(),
SvrRecursive(),
RandomForestRecursive(),
GradientBoostingRecursive(),
Lstm_Keras(),
NeuralNetwork(),
GPRegression(),
Revarb(),
# FBProphet()
]
sc = SparkContext(appName="TimeSeriesForecasting")
tss_rdd = sc.parallelize(tss, 48)
models_rdd = sc.parallelize(models_to_test, len(models_to_test))
# tss_rdd = sc.parallelize(list(range(100)), 1)
logger.info(
"Partitions -> Models: {} TSS: {}".format(models_rdd.getNumPartitions(), tss_rdd.getNumPartitions()))
res_rdd = tss_rdd.cartesian(models_rdd).map(run_experiment).groupByKey().zipWithIndex().cache()
date_format = "%Y-%m-%d-%H-%M-%S"
experiment_directory_path = os.path.join(script_dir, "experiment_results",
datetime.now().strftime(date_format))
if not os.path.exists(experiment_directory_path):
os.makedirs(experiment_directory_path)
for i in range(res_rdd.count()):
((model_name, _tss), _) = res_rdd.filter(lambda x: x[1] == i).collect()[0]
csv_writer = None
file_path = os.path.join(experiment_directory_path, "predicted_{}.csv".format(model_name))
with open(file_path, "w", newline='') as csv_file:
for _v in _tss:
row_to_write = _v[0].to_csv()
if csv_writer is None:
csv_writer = csv.DictWriter(csv_file, fieldnames=row_to_write.keys(),
delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
csv_writer.writeheader()
csv_writer.writerow(row_to_write)
sc.stop()
logger.info("All Experiments finished. Results are in {}".format(experiment_directory_path))
else:
logger.warning("No Time Series to process.")