Przeglądaj źródła

working but very slow

master
nitowa 2 lat temu
rodzic
commit
57e70a1fed
2 zmienionych plików z 33 dodań i 27 usunięć
  1. 10
    13
      scratchpad.py
  2. 23
    14
      src/spark/main.py

+ 10
- 13
scratchpad.py Wyświetl plik

@@ -1,5 +1,7 @@
1
+import time
1 2
 import sys
2 3
 import json
4
+from typing import Dict
3 5
 from cassandra.cluster import Cluster
4 6
 
5 7
 sys.path.append("config/db")
@@ -14,19 +16,14 @@ session = cluster.connect(config['cassandra_keyspace'])
14 16
 print(f"Connection OK")
15 17
 
16 18
 result = session.execute("SELECT * FROM clusters")
17
-print(result.all())
18 19
 
20
+map = dict()
19 21
 
20
-"""
21
-sc = pyspark.SparkContext('spark://osboxes:7077')
22
+for e in result.all():
23
+    if(e[1] not in map):
24
+        map[e[1]] = []
25
+    
26
+    map[e[1]].append(e[0])
22 27
 
23
-data = sc.parallelize(list("aaa bbb cc dd e f"))
24
-counts = data \
25
-    .map(lambda x: (x, 1)) \
26
-    .reduceByKey(add) \
27
-    .sortBy(lambda x: x[1], ascending=False) \
28
-    .collect()
29
-
30
-for (word, count) in counts:
31
-    print("{}: {}".format(word, count))
32
-"""
28
+for key in map:
29
+    print(sorted(map[key]))

+ 23
- 14
src/spark/main.py Wyświetl plik

@@ -4,11 +4,13 @@ import json
4 4
 from sqlite3 import Row
5 5
 from typing import Iterable, List
6 6
 
7
-from pyspark import RDD
8
-
9 7
 from pyspark.sql import SparkSession, DataFrame, Row
10 8
 from pyspark.sql import functions as F
11 9
 
10
+import time
11
+start = time.time()
12
+
13
+
12 14
 config = json.load(open("./settings.json"))
13 15
 debug = config['debug']
14 16
 
@@ -63,18 +65,22 @@ class Master:
63 65
             .toDF(["tx_group", "index"])
64 66
 
65 67
     def rewrite_cluster_parent(self, cluster_roots: Iterable[str], new_cluster_root: str) -> None:
66
-        sqlstr = f"""
67
-            UPDATE {self.CLUSTERS_TABLE} 
68
-            SET parent='{new_cluster_root}' 
69
-            WHERE parent IN ({','.join(map(lambda r: f"'{r}'", cluster_roots))})"""
68
+        cluster_rewrite = self.spark \
69
+            .table(self.CLUSTERS_TABLE) \
70
+            .where(F.col('parent').isin(cluster_roots)) \
71
+            .select('address') \
72
+            .rdd \
73
+            .map(lambda addr: (addr['address'], new_cluster_root)) \
74
+            .toDF(['address', 'parent']) \
70 75
         
71 76
         if(debug):
72
-            print("UPDATE SQL")
73
-            print(sqlstr)
77
+            print("REWRITE JOB")
78
+            cluster_rewrite.show(truncate=False, vertical=True)
74 79
             print()
75 80
 
76
-        self.spark.sql(sqlstr)
77
-
81
+        cluster_rewrite.writeTo(self.CLUSTERS_TABLE).append()
82
+        
83
+        
78 84
 # end class Master
79 85
 
80 86
 
@@ -104,7 +110,7 @@ for i in range(0, tx_addr_groups.count()):
104 110
 
105 111
     if(debug):
106 112
         print("KNOWN CLUSTERS")
107
-        cluster_addr_groups.show(truncate=False)
113
+        cluster_addr_groups.show(truncate=True)
108 114
         print()
109 115
 
110 116
     tx_addrs: Iterable[str] = tx_groups_indexed \
@@ -129,7 +135,7 @@ for i in range(0, tx_addr_groups.count()):
129 135
         print("cluster_tx_mapping")
130 136
         cluster_tx_mapping \
131 137
             .toDF(['cluster', 'tx']) \
132
-            .show(truncate=False)
138
+            .show(truncate=True)
133 139
         print()
134 140
 
135 141
 
@@ -145,7 +151,7 @@ for i in range(0, tx_addr_groups.count()):
145 151
 
146 152
 
147 153
     if(len(matched_roots) == 0):
148
-        new_root = master.insertNewCluster(tx_addrs)
154
+        master.insertNewCluster(tx_addrs)
149 155
     elif(len(matched_roots) == 1):
150 156
         master.insertNewCluster(tx_addrs, matched_roots[0])
151 157
     else:
@@ -153,4 +159,7 @@ for i in range(0, tx_addr_groups.count()):
153 159
         master.insertNewCluster(tx_addrs, matched_roots[0])
154 160
 
155 161
     if(debug):
156
-        print("==============")
162
+        print("======================================================================")
163
+
164
+end = time.time()
165
+print("ELAPSED TIME:", end-start)

Ładowanie…
Anuluj
Zapisz