Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
write_reaction_system: bool
whether or not to write reaction_system table
"""
self.stdout.write('Starting transfer\n')
con = self.connection or self._connect()
self._initialize(con)
self.stdout.write('Finished initialization\n')
cur = con.cursor()
self.stdout.write('Got a cursor\n')
self.stdout.write('Connecting to {0}\n'.format(self.server_name))
nrows = 0
if write_ase:
self.stdout.write('Transfering atomic structures\n')
db = ase.db.connect(filename_sqlite)
n_structures = db.count()
n_blocks = n_structures // block_size + 1
t_av = 0
for block_id in range(start_block, n_blocks):
i = block_id - start_block
t1 = time.time()
b0 = block_id * block_size
b1 = (block_id + 1) * block_size + 1
if block_id + 1 == n_blocks:
b1 = n_structures + 1
rows = list(db.select('{}
def merge_datasets(merged_dbpath, dbpaths, **mergedb_kwargs):
if type(dbpaths) is dict:
names = dbpaths.keys()
dbpaths = dbpaths.values()
else:
names = [dbp.split("/")[-1].split(".")[0] for dbp in dbpaths]
partitions = {}
offset = 0
partition_meta = {}
with connect(merged_dbpath, use_lock_file=False) as dst:
for name, dbp in zip(names, dbpaths):
start = offset
if name in partitions.keys():
count = 1
while name + "_" + str(count) in partitions.keys():
count += 1
name = name + "_" + str(count)
with connect(dbp) as src:
length = src.count()
end = offset + length
partition_meta[name] = src.metadata
for row in src.select():
at = row.toatoms()
cur.execute(
"SELECT COUNT(*) FROM information WHERE name='metadata'")
if cur.fetchone()[0]:
cur.execute(
"UPDATE information SET value=? WHERE name='metadata'", [md])
else:
cur.execute('INSERT INTO information VALUES (?, ?)',
('metadata', md))
con.commit()
if __name__ == '__main__':
import sys
from ase.db import connect
con = connect(sys.argv[1])
con._initialize(con._connect())
print('Version:', con.version)
def submit_relaxation_db(self, database, spec=None):
"""Submit each entry of an ASE database for relaxation.
This requires that the calculation parameters be stored in
the data under `data.calculator_parameters`.
Parameters
----------
database : str
Path to ASE database to be looped over for relaxation.
spec : dict
Additional specification to be passed to Fireworks.
"""
filename = database.split('/')[-1]
db = connect(database)
for d in db.select():
self.submit_relaxation(
d, workflow_name=filename, spec=spec)
# creates: ase-db.txt, ase-db-long.txt, known-keys.csv
from __future__ import print_function
import subprocess
import ase.db
from ase import Atoms
from ase.calculators.emt import EMT
from ase.db.core import default_key_descriptions
from ase.optimize import BFGS
c = ase.db.connect('abc.db', append=False)
h2 = Atoms('H2', [(0, 0, 0), (0, 0, 0.7)])
h2.calc = EMT()
h2.get_forces()
c.write(h2, relaxed=False)
BFGS(h2).run(fmax=0.01)
c.write(h2, relaxed=True, data={'abc': [1, 2, 3]})
for d in c.select('molecule'):
print(d.forces[0, 2], d.relaxed)
h = Atoms('H')
h.calc = EMT()
h.get_potential_energy()
File dependencies:
------------------
fname : ase-db file
Contains molecular reference states.
Mandatory key value pairs:
--------------------------
"energy" or "epot" : float
DFT calculated potential energy.
Optional key value pairs:
--------------------------
"data.BEEFens" : list
32 non-selfconsistent BEEF-vdW energies.
"""
# Connect to a database.
cmol = ase.db.connect(fname)
# Select data using search filters.
smol = cmol.select(selection)
# Connect to a database with frequencies.
if freq_path is not None:
c_freq = ase.db.connect(freq_path)
abinitio_energies = {}
freq_dict = {}
dbids = {}
ens_dict = {}
# Iterate over molecules.
for d in smol:
if 'energy' in d:
abinitio_energy = float(d.energy)
else:
abinitio_energy = float(d.epot)
species_name = str(d.formula)
@monkeypatch_class(vasp.Vasp)
def get_db(self, *keys):
"""Retrieve values for each key in keys.
First look for key/value, then in data.
"""
dbfile = os.path.join(self.directory, 'DB.db')
if not os.path.exists(dbfile):
return [None for key in keys] if len(keys) > 1 else None
vals = [None for key in keys]
from ase.db import connect
with connect(dbfile) as con:
try:
at = con.get(id=1)
for i, key in enumerate(keys):
vals[i] = (at.key_value_pairs.get(key, None)
or at.data.get(key, None))
except KeyError, e:
if e.message == 'no match':
pass
return vals if len(vals) > 1 else vals[0]
def extxyz_to_db(extxyz_path, db_path):
r"""
Convertes en extxyz-file to an ase database
Args:
extxyz_path (str): path to extxyz-file
db_path(str): path to sqlite database
"""
with connect(db_path, use_lock_file=False) as conn:
with open(extxyz_path) as f:
for at in tqdm(read_xyz(f, index=slice(None)), "creating ase db"):
data = {}
if at.has("forces"):
data["forces"] = at.get_forces()
data.update(at.info)
conn.write(at, data=data)
use_bits (bool, optional): set True to return the non-zero bits in the
fingerprint instead of the pybel.Fingerprint object (default: False)
use_con_mat (bool, optional): set True to use pre-computed connectivity
matrices (need to be stored in the training database in compressed format
under the key 'con_mat', default: False)
Returns:
dict (str->list of tuple): dictionary with list of tuples under the key
'fingerprints' containing the fingerprint, the canonical smiles representation,
and the atoms per type string of each molecule listed in train_idx (preserving
the order)
'''
train_fps = []
if use_con_mat:
compressor = ConnectivityCompressor()
with connect(dbpath) as conn:
if not print_file:
print('0.00%', end='\r', flush=True)
for i, idx in enumerate(train_idx):
idx = int(idx)
row = conn.get(idx + 1)
at = row.toatoms()
pos = at.positions
atomic_numbers = at.numbers
if use_con_mat:
con_mat = compressor.decompress(row.data['con_mat'])
else:
con_mat = None
train_fps += [get_fingerprint(pos, atomic_numbers,
use_bits, con_mat)]
if (i % 100 == 0 or i + 1 == len(train_idx)) and not print_file:
print('\033[K', end='\r', flush=True)
def build(self, name):
if name == '-':
con = db.connect(sys.stdin, 'json')
return con.get_atoms(add_additional_information=True)
elif self.args.collection:
con = db.connect(self.args.collection)
return con.get_atoms(name)
else:
atoms = read(name)
if isinstance(atoms, list):
assert len(atoms) == 1
atoms = atoms[0]
return atoms