|
@@ -64,15 +64,9 @@ def cluster_step(clusters: "List[List[str]]", addresses: "List[List[str]]"):
|
64
|
64
|
|
65
|
65
|
return cluster_step(clusters,addresses)
|
66
|
66
|
|
67
|
|
-
|
68
|
|
-def cluster_id_addresses_rows(iter: "Iterable[Row]") -> Iterable:
|
69
|
|
- address_lists = list(map(lambda row: row['addresses'], iter))
|
70
|
|
- yield cluster_step([], address_lists)
|
|
67
|
+def cluster_partition(iter: "Iterable[Row]") -> Iterable:
|
|
68
|
+ yield cluster_step([], list(map(lambda row: row['addresses'], iter)))
|
71
|
69
|
|
72
|
|
-def dud(iter):
|
73
|
|
- address_lists = list(map(lambda row: row['addresses'], iter))
|
74
|
|
- yield address_lists
|
75
|
|
-
|
76
|
70
|
master = Master(config)
|
77
|
71
|
master.spark.catalog.clearCache()
|
78
|
72
|
master.spark.sparkContext.setCheckpointDir(config['spark_checkpoint_dir'])
|
|
@@ -84,14 +78,14 @@ tx_grouped = tx_df \
|
84
|
78
|
.agg(F.collect_set('address').alias('addresses')) \
|
85
|
79
|
.orderBy('tx_id') \
|
86
|
80
|
|
87
|
|
-print()
|
88
|
81
|
res = tx_grouped \
|
89
|
82
|
.repartition(5) \
|
90
|
83
|
.rdd \
|
91
|
|
- .mapPartitions(cluster_id_addresses_rows) \
|
|
84
|
+ .mapPartitions(cluster_partition) \
|
92
|
85
|
.fold([], cluster_step)
|
93
|
86
|
|
94
|
87
|
for cluster in res:
|
|
88
|
+ print()
|
95
|
89
|
print(sorted(cluster))
|
96
|
90
|
|
97
|
91
|
end = time.time()
|