import findspark
findspark .init ()
from pyspark .sql import SparkSession
spark = SparkSession .builder \
.master ("yarn" ) \
.appName ("chris-adhoc2" ) \
.config ('java.library.path' , "file:<--hdppath-->/hadoop/lib/native" ) \
.config ('spark.driver.cores' , "8" ) \
.config ('spark.driver.memory' , "8g" ) \
.config ('spark.executor.cores' , "8" ) \
.config ('spark.executor.memory' , "16g" ) \
.config ('spark.executor.instances' , "16" ) \
.config ('spark.serializer' , "org.apache.spark.serializer.KryoSerializer" ) \
.config ("spark.sql.warehouse.dir" , "<--warehouse-location-->" ) \
.enableHiveSupport ()\
.getOrCreate ()
spark .conf .set ("spark.sql.orc.filterPushdown" , "true" )
spark .read .parquet ("<--data-location-->/sessionized" ).printSchema ()
root
|-- clientId: string (nullable = true)
|-- duration: integer (nullable = true)
|-- accesses: integer (nullable = true)
|-- uniqUrls: integer (nullable = true)
|-- startTime: integer (nullable = true)
|-- endTime: integer (nullable = true)
|-- datehour: string (nullable = true)
spark .read .parquet ("<--output-location-->/sessionized" ).createOrReplaceTempView ("sessionized" )
spark .sql ("""
select count(*), datehour from sessionized
group by datehour
order by datehour
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
count(1)
datehour
0
14871
2015-07-22T03
1
24066
2015-07-22T05
2
21228
2015-07-22T07
3
32309
2015-07-22T09
4
63934
2015-07-22T10
5
62876
2015-07-22T11
6
131
2015-07-22T12
7
174
2015-07-22T13
8
84029
2015-07-22T16
9
40762
2015-07-22T17
10
76254
2015-07-22T18
11
97
2015-07-22T19
12
16825
2015-07-22T21
spark .sql ("""
select cast(avg(duration) as int), datehour from sessionized
group by datehour
order by datehour
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(avg(duration) AS INT)
datehour
0
6
2015-07-22T03
1
9
2015-07-22T05
2
13
2015-07-22T07
3
13
2015-07-22T09
4
15
2015-07-22T10
5
26
2015-07-22T11
6
1
2015-07-22T12
7
2
2015-07-22T13
8
19
2015-07-22T16
9
18
2015-07-22T17
10
18
2015-07-22T18
11
0
2015-07-22T19
12
4
2015-07-22T21
spark .sql ("""
select cast(avg(duration) as int) from sessionized
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(avg(duration) AS INT)
0
17
Unique URL visits per session
spark .sql ("""
select cast(avg(uniqUrls) as int), datehour from sessionized
group by datehour
order by datehour
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(avg(uniqUrls) AS INT)
datehour
0
1
2015-07-22T03
1
2
2015-07-22T05
2
2
2015-07-22T07
3
2
2015-07-22T09
4
2
2015-07-22T10
5
2
2015-07-22T11
6
1
2015-07-22T12
7
1
2015-07-22T13
8
2
2015-07-22T16
9
2
2015-07-22T17
10
2
2015-07-22T18
11
1
2015-07-22T19
12
1
2015-07-22T21
spark .sql ("""
select cast(sum(uniqUrls) as int), datehour from sessionized
group by datehour
order by datehour
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(sum(uniqUrls) AS INT)
datehour
0
24696
2015-07-22T03
1
57247
2015-07-22T05
2
58223
2015-07-22T07
3
89315
2015-07-22T09
4
178321
2015-07-22T10
5
179085
2015-07-22T11
6
144
2015-07-22T12
7
188
2015-07-22T13
8
181874
2015-07-22T16
9
90589
2015-07-22T17
10
160872
2015-07-22T18
11
97
2015-07-22T19
12
22935
2015-07-22T21
spark .sql ("""
select cast(avg(uniqUrls) as int)from sessionized
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(avg(uniqUrls) AS INT)
0
2
spark .sql ("""
select cast(sum(uniqUrls) as int)from sessionized
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
CAST(sum(uniqUrls) AS INT)
0
1043586
Ranking on total duration
spark .sql ("""
select sum(duration) as total_duration, clientId from sessionized
group by clientId
order by total_duration desc
limit 20
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
total_duration
clientId
0
4726
203.191.34.178:10400
1
4112
54.169.191.85:15462
2
3026
52.74.219.71:58226
3
2948
52.74.219.71:33576
4
2743
52.74.219.71:46492
5
2358
119.81.61.166:35995
6
2334
119.81.61.166:37795
7
2325
119.81.61.166:39102
8
2323
119.81.61.166:40602
9
2291
52.74.219.71:59692
10
2264
52.74.219.71:54439
11
2225
52.74.219.71:57623
12
2089
52.74.219.71:51841
13
2086
52.74.219.71:53157
14
2065
103.29.159.138:57045
15
2065
213.239.204.204:35094
16
2064
78.46.60.71:58504
17
2064
52.74.219.71:59516
18
2064
52.74.219.71:57649
19
2060
103.29.159.186:27174
Ranking on total accesses
spark .sql ("""
select sum(accesses) as total_accesses, clientId from sessionized
group by clientId
order by total_accesses desc
limit 20
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
total_accesses
clientId
0
1946
112.196.25.164:55986
1
1752
112.196.25.164:42792
2
1429
112.196.25.164:37516
3
768
54.169.191.85:15462
4
595
106.51.132.54:5048
5
550
54.169.191.85:3328
6
461
106.51.132.54:4508
7
324
106.51.132.54:5049
8
319
106.51.132.54:4489
9
275
14.102.53.58:4637
10
273
106.51.132.54:5037
11
255
106.51.132.54:4219
12
252
106.51.132.54:4974
13
251
106.51.132.54:4221
14
240
118.102.239.85:44128
15
240
106.51.132.54:4212
16
239
88.198.69.103:47828
17
237
78.46.60.71:58504
18
235
106.51.132.54:4235
19
234
213.239.204.204:35094
Ranking on total unique urls
spark .sql ("""
select sum(uniqUrls) as total_uniqUrls, clientId from sessionized
group by clientId
order by total_uniqUrls desc
limit 20
""" ).toPandas ()
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
total_uniqUrls
clientId
0
407
106.51.132.54:5048
1
320
106.51.132.54:5049
2
290
106.51.132.54:4508
3
271
106.51.132.54:5037
4
251
106.51.132.54:4974
5
239
88.198.69.103:47828
6
237
78.46.60.71:58504
7
234
213.239.204.204:35094
8
226
106.51.132.54:4841
9
186
106.51.132.54:5035
10
178
188.40.94.195:46918
11
164
144.76.99.19:58987
12
159
78.46.60.71:46308
13
154
188.40.135.194:43628
14
153
188.40.135.194:59090
15
150
176.9.154.132:50326
16
145
106.51.132.54:4489
17
127
66.249.71.110:41229
18
110
141.8.143.205:42162
19
107
141.8.143.205:57227