|
@@ -94,37 +94,6 @@ def find(data: tuple[Row, Iterable[str]]) -> str | None:
|
94
|
94
|
else:
|
95
|
95
|
return None
|
96
|
96
|
|
97
|
|
-def handleTx(tx_addr_group: Row):
|
98
|
|
-
|
99
|
|
-
|
100
|
|
- found_clusters: "RDD[str]" = clusters.rdd \
|
101
|
|
- .map(lambda cluster: (cluster, tx_addr_group['addresses'])) \
|
102
|
|
- .map(find) \
|
103
|
|
- .filter(lambda x: x != None)
|
104
|
|
-
|
105
|
|
-
|
106
|
|
- if(found_clusters.count() == 0):
|
107
|
|
- insertNewCluster(tx_addr_group)
|
108
|
|
- return
|
109
|
|
-
|
110
|
|
- cluster_roots = found_clusters.collect()
|
111
|
|
-
|
112
|
|
- cl = clusters \
|
113
|
|
- .select('addresses') \
|
114
|
|
- .where(
|
115
|
|
- F.col('parent').isin(cluster_roots)
|
116
|
|
- ) \
|
117
|
|
- .agg(F.collect_set('addresses').alias('agg')) \
|
118
|
|
- .select(F.flatten('agg').alias('addresses')) \
|
119
|
|
- .select(F.explode('addresses')) \
|
120
|
|
- .rdd \
|
121
|
|
- .map(lambda addr: (addr, cluster_roots[0])) \
|
122
|
|
- .toDF(['address', 'parent']) \
|
123
|
|
- .show()
|
124
|
|
- #.writeTo(CLUSTERS_TABLE) \
|
125
|
|
- #.append()
|
126
|
|
-
|
127
|
|
-
|
128
|
97
|
master = Master(config)
|
129
|
98
|
|
130
|
99
|
tx_addr_groups = master.group_tx_addrs()
|