|
@@ -49,7 +49,7 @@ master.spark.sparkContext.setCheckpointDir('./checkpoints') # spark is really a
|
49
|
49
|
|
50
|
50
|
tx_df = master.get_tx_dataframe()
|
51
|
51
|
|
52
|
|
-transaction_as_vertices = tx_df \
|
|
52
|
+addresses_as_vertices = tx_df \
|
53
|
53
|
.select('address') \
|
54
|
54
|
.withColumnRenamed('address', 'id') \
|
55
|
55
|
.distinct()
|
|
@@ -65,7 +65,7 @@ transactions_as_edges = tx_df \
|
65
|
65
|
.flatMap(explode_row) \
|
66
|
66
|
.toDF(['src', 'dst'])
|
67
|
67
|
|
68
|
|
-g = GraphFrame(transaction_as_vertices, transactions_as_edges)
|
|
68
|
+g = GraphFrame(addresses_as_vertices, transactions_as_edges)
|
69
|
69
|
components = g.connectedComponents(algorithm='graphframes')
|
70
|
70
|
|
71
|
71
|
master.write_connected_components_as_clusters(components)
|