123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- from cassandra.cluster import Cluster
- from cassandra.query import BoundStatement, BatchStatement
- import csv
-
-
- def db_insert_csv_txs(config, tx_file, skip=0, limit=-1):
- print(" == DB TX INSERTION SCRIPT == ")
-
- print(
- f"Attempting Cassandra connection @ {config['cassandra_addresses']}:{config['cassandra_port']}")
- cluster = Cluster(config['cassandra_addresses'],
- port=config['cassandra_port'])
- session = cluster.connect(config['cassandra_keyspace'])
- print(f"Connection OK")
-
- statement = session.prepare(
- f"INSERT INTO {config['tx_table_name']} (tx_id,address,value,tx_hash,block_id,timestamp) VALUES(?,?,?,?,?,?);")
- boundStatement = BoundStatement(statement)
-
- with open(tx_file, newline='') as (tx_csv):
- rowreader = csv.reader(tx_csv, dialect="excel")
- next(rowreader) # skip header
-
- batchStatement = BatchStatement()
-
- batch_count = 0
-
- for i, row in enumerate(rowreader):
- if i < skip:
- continue
- if i == limit:
- break
-
- batchStatement.add(boundStatement.bind(
- [int(row[0]), str(row[1]), int(row[2]), str(row[3]), int(row[4]), int(row[5])]))
- batch_count += 1
- if batch_count > 256:
- session.execute(batchStatement)
- batchStatement = BatchStatement()
- batch_count = 0
-
- session.execute(batchStatement)
-
- print("Done!")
- cluster.shutdown()
|