1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- from typing import Iterable, List, Set
-
- def merge_lists_distinct(*lists: "Iterable[List[str]]") -> List[str]:
- accum = set()
- for lst in lists:
- accum = accum.union(set(lst))
- return list(accum)
-
- def check_lists_overlap(list1, list2):
- return any(x in list1 for x in list2)
-
- def cluster_step(clusters: "List[List[str]]", addresses: "List[List[str]]"):
- #if there are no more sets of addresses to consider, we are done
- if(len(addresses) == 0):
- return clusters
-
- tx = addresses[0]
- matching_clusters = []
- new_clusters = []
-
- for cluster in clusters:
- if(check_lists_overlap(tx, cluster)):
- matching_clusters.append(cluster)
- else:
- new_clusters.append(cluster)
-
- new_clusters.append(merge_lists_distinct(tx, *matching_clusters))
-
- return cluster_step(new_clusters,addresses[1:])
-
- def cluster_step_iter(clusters: "List[List[str]]", addresses: "List[List[str]]"):
-
- clstr = clusters
- addrs = addresses
-
- while True:
- if(len(addrs) == 0):
- break
-
- tx = addrs[0]
- matching_clusters = []
- new_clusters = []
-
- for cluster in clstr:
- if(check_lists_overlap(tx, cluster)):
- matching_clusters.append(cluster)
- else:
- new_clusters.append(cluster)
-
- new_clusters.append(merge_lists_distinct(tx, *matching_clusters))
- clstr = new_clusters
- addrs = addrs[1:]
-
- return clstr
-
- def cluster_n(clusters: "List[List[str]]", addresses: "List[List[str]]"):
- tx_sets = map(set, clusters+addresses)
- unions = []
- for tx in tx_sets:
- temp = []
- for s in unions:
- if not s.isdisjoint(tx):
- tx = s.union(tx)
- else:
- temp.append(s)
- temp.append(tx)
- unions = temp
- return unions
|