"""Quantum ESPRESSO Calculator
export PYTHONPATH="Your-Location"/xespresso:$PYTHONPATH
export ASE_ESPRESSO_COMMAND="/path/to/PACKAGE.x PARALLEL -in PREFIX.PACKAGEi > PREFIX.PACKAGEo"
export ESPRESSO_PSEUDO="/path/to/pseudo"
Run PACKAGE.x jobs.
"""
from ase import io
from ase.calculators.calculator import (
FileIOCalculator,
CalculationFailed,
equal,
compare_atoms,
PropertyNotPresent,
)
from xespresso.xio import (
write_espresso_in,
read_espresso_input,
read_espresso_asei,
write_espresso_asei,
get_atomic_species,
get_atomic_constraints,
)
from xespresso.data.pseudo import pseudo_gropus
from ase.io.espresso import read_espresso_in
import os
import shutil
import numpy as np
from datetime import datetime
import copy
import logging
import sys
import subprocess
import warnings
from pprint import pprint
logging.basicConfig(
stream=sys.stdout,
format=("%(levelname)-8s " "[%(funcName)-20s]: %(message)s"),
level=logging.INFO,
)
logger = logging.getLogger(__name__)
error_template = (
'Property "%s" not available. Please try running Quantum\n'
"Espresso first by calling Atoms.get_potential_energy()."
)
warn_template = (
'Property "%s" is None. Typically, this is because the '
"required information has not been printed by Quantum "
'Espresso at a "low" verbosity level (the default). '
'Please try running Quantum Espresso with "high" verbosity.'
)
_blocked_keywords = {}
[docs]class Espresso(FileIOCalculator):
"""
Accepts all the options for pw.x as given in the QE docs, plus some
additional options:
input_data, pseudopotentials, kspacing, kpts, koffset
Please have a look at Espresso module in ASE.
https://wiki.fysik.dtu.dk/ase/ase/calculators/espresso.html
queue: dict
A dictionary with parameters for job submission, e.g.
>>> queue = {'nodes': 4, 'ntasks-per-node': 20,
>>> 'account': 'xxx', 'partition': 'normal',
>>> 'time': '23:59:00'}
package: str
Choose the quantum espresso package: pw, dos, projwfc, band, pp, ph, ..
For NEB calculation, please use neb.NEBEspresso module.
Calculaiton use phonon is not implemented yet.
parallel: str
A str which control the parallelization parameters: -nimage, -npools,
-nband, -ntg, -ndiag or -northo (shorthands, respectively:
-ni, -nk, -nb, -nt, -nd).
Examples:
1. Perform a regular self-consistent calculation:
>>> calc = Espresso(input_data=input_data, ...)
>>> atoms.set_calculator(calc)
>>> atoms.get_potential_energy()
"""
implemented_properties = ["energy", "forces", "stress", "magmoms", "time"]
command = "PACKAGE.x PARALLEL -in PREFIX.PACKAGEi > PREFIX.PACKAGEo"
discard_results_on_any_change = False
def __init__(
self,
label="xespresso",
prefix=None,
atoms=None,
package="pw",
parallel="",
queue=None,
debug=False,
pseudo_group=None,
**kwargs
):
print("{0:=^60}".format(package))
if debug:
logger.setLevel(debug)
self.set_label(label, prefix)
self.scf_directory = None
self.scf_parameters = None
self.scf_results = None
if atoms:
self.atoms = atoms
if pseudo_group is not None:
kwargs["pseudopotentials"] = self.find_pseudopotentials(pseudo_group)
kwargs["pseudo_dir"] = os.path.join(
os.environ["ESPRESSO_PSEUDO"], pseudo_group
)
kwargs = self.check_input(kwargs, prefix=self.prefix)
self.ase_parameters = kwargs
FileIOCalculator.__init__(
self, restart=self.directory, label=self.label, atoms=atoms, **kwargs
)
self.queue = queue
self.parallel = parallel
self.package = package
self.parallel = parallel
self.debug = debug
# self.discard_results_on_any_change = False
[docs] def find_pseudopotentials(self, pseudo_group="SSSP_1.1.2_PBE_efficiency"):
"""Get pseudo potential by family name.
Args:
pseudo_group (str): name of the pseudo family.
"""
elements = set(self.atoms.get_chemical_symbols())
pseudopotentials = {}
for ele in elements:
pseudopotentials[ele] = pseudo_gropus[pseudo_group][ele.upper()]
return pseudopotentials
[docs] def set_label(self, label, prefix):
"""Set directory and prefix from label"""
self.directory = label
if not prefix:
self.prefix = os.path.split(label)[1]
else:
self.prefix = prefix
if not os.path.exists(self.directory):
os.makedirs(self.directory)
self.label = os.path.join(self.directory, self.prefix)
self.pwi = os.path.join(self.directory, "%s.pwi" % self.prefix)
self.pwo = os.path.join(self.directory, "%s.pwo" % self.prefix)
self.asei = os.path.join(self.directory, "%s.asei" % self.prefix)
self.asei_temp = os.path.join(self.directory, ".%s.asei_temp" % self.prefix)
self.save_directory = os.path.join(self.directory, "%s.save" % self.prefix)
logger.debug("Directory: %s" % (self.directory))
logger.debug("Prefix: %s" % (self.prefix))
[docs] def check_pseudopotentials(self, pseudopotentials):
"""check pseudopotentials"""
# pseudopotentials
if "species" not in self.atoms.arrays:
all_species = set(self.atoms.get_chemical_symbols())
else:
all_species = set(self.atoms.arrays["species"])
new_pseudopotentials = {}
for species in all_species:
assert (
species in pseudopotentials
), "\n Species: %s, does not have a pseudopotentials!" % (species)
new_pseudopotentials[species] = pseudopotentials[species]
[docs] def read(self, label):
"""Read the files in a calculation if they exist.
This function reads self.parameters and atoms.
"""
# Else read a regular calculation. we start with reading stuff
# that is independent of the calculation state.
self.directory = label
self.restart_atoms = None
self.restart_parameters = {}
if os.path.exists(label) and os.path.exists(self.asei):
logger.debug("Try to read information from folder: %s." % (label))
try:
# atoms, parameters = self.read_xml_file()
atoms, parameters = read_espresso_asei(self.asei)
self.restart_atoms = atoms
self.restart_parameters = copy.deepcopy(parameters)
# self.parameters = copy.deepcopy(parameters)
self.read_results()
except Exception as e:
logger.debug("Directory: %s exist, faild to read. %s" % (label, e))
else:
logger.debug(
"Directory %s is not a espresso folder, start a new calculation:"
% (label)
)
def set(self, **kwargs):
""" """
from xespresso.input_parameters import restart_ignore
from xespresso.utils import compare_parameters
self.changed_parameters, self.igonre_parameters = compare_parameters(
self.restart_parameters, self.ase_parameters, ignore=restart_ignore["PW"]
)
self.parameters = kwargs
if self.discard_results_on_any_change and self.changed_parameters:
self.reset()
return self.changed_parameters
[docs] def check_state(self, atoms, tol=1e-4):
"""Check for any system changes since last calculation."""
if not os.path.exists(self.asei):
return True
if isinstance(self.restart_atoms, list):
if len(self.restart_atoms) != len(atoms):
return True
for i in range(len(atoms)):
if compare_atoms(
self.restart_atoms[i],
atoms[i],
tol=tol,
excluded_properties=set(self.ignored_changes),
):
logger.debug("Atoms changed, run a new calculation")
return True
else:
if compare_atoms(
self.restart_atoms,
atoms,
tol=tol,
excluded_properties=set(self.ignored_changes),
):
logger.debug("Atoms changed, run a new calculation")
return True
# if self.igonre_parameters:
# logger.debug('Parameter: %s are ignored, results are not affected.'%self.igonre_parameters)
if self.changed_parameters:
logger.debug(
"Parameter: %s changed, results are affected" % self.changed_parameters
)
return True
logger.debug("Same geometry and parameters, use previous results.")
converged, meg = self.read_convergence()
if converged > 0:
logger.debug("Not converged: %s" % meg)
return True
return False
[docs] def read_results(self):
"""Read PW results.
get atomic species
"""
pwo = os.path.join(self.directory, "%s.pwo" % self.prefix)
pwi = os.path.join(self.directory, "%s.pwi" % self.prefix)
convergence, meg = self.read_convergence()
if convergence != 0:
logger.debug("Not converged. %s" % (meg))
try:
atoms = read_espresso_in(pwi)
self.results["atoms"] = atoms
except Exception as e:
logger.debug("Read input: %s, failed! %s" % (pwi, e))
try:
output = io.read(pwo)
atomic_species = get_atomic_species(pwo)
constraints = get_atomic_constraints(pwo, len(output))
output.set_constraint(None)
output.set_constraint(constraints)
if atomic_species:
output.new_array("species", np.array(atomic_species, dtype="U20"))
self.calc = output.calc
self.results = output.calc.results
self.results["atoms"] = output
self.efermi = self.get_fermi_level()
# self.nspins = self.get_number_of_spins()
logger.debug("Read result successfully!")
except Exception as e:
logger.debug("Read output: %s, failed! %s" % (pwo, e))
self.results["convergence"] = convergence
self.results["label"] = self.label
# logger.debug('Read result failed!')
# pwos = [file for file in os.listdir(self.directory) if pwo in file]
# output = None
# for pwo in pwos:
# atomic_species = None
# pwo = os.path.join(self.directory, pwo)
# atomic_species = get_atomic_species(pwo)
[docs] def check_xml_file(self):
"""
If data-file-schema.xml is readable, then xml_end = True
"""
self.save_directory = os.path.join(self.directory, "%s.save" % self.prefix)
if not os.path.exists(self.save_directory):
return False
xml0 = os.path.join(self.save_directory, "data-file-schema.xml")
xmls = [
file
for file in os.listdir(self.save_directory)
if "data-file-schema.xml" in file
]
goodxml = []
for xml in xmls:
xml = os.path.join(self.save_directory, xml)
with open(xml, "r") as f:
lines = f.readlines()
if len(lines) < 1:
continue
if "</qes:espresso>" in lines[-1]:
goodxml.append(xml)
xml_end = False
nxml = len(goodxml)
# print(nxml, goodxml, xml0)
if nxml > 0:
if xml0 not in goodxml:
shutil.copy(goodxml[-1], xml0)
xml_end = True
return xml_end
def read_xml_file(self):
from xespresso.utils.xml_parser import xml_parser
if not self.check_xml_file():
logger.debug("xml file not finished.")
#
xmlfile = os.path.join(self.save_directory, "data-file-schema.xml")
if not os.path.exists(xmlfile):
return None, {}
try:
atoms, xml_input = xml_parser(xmlfile)
logger.debug("Read geometry and parameters from xml file successfully.")
except:
logger.debug("Read xml file failed.")
return None, {}
return atoms, xml_input
[docs] def read_convergence(self):
"""
Read the status of the calculation.
exit code:
'0': 'Done',
'-1': ' manual cancelled',
'1': 'Maximum CPU time exceeded', convergence NOT achieved after n iterations
Then, change the parameter: 1) mixing_beta or 2) degauss
'2': 'manual restart', 'Pending or not submit',
'3': 'other errors',
'4': 'unknow error',
"""
# read the error file from queue
# if self.queue:
# errfile = self.label + '.err'
# if not os.path.exists(errfile):
# logger.debug('%s not exists'%errfile)
# # return 2, 'Pending or not submit'
# else:
# errs = ['RESTART', 'pw.x', 'out-of-memory',
# 'NODE FAILURE', 'TIME LIMIT', 'COMMUNICATION',
# 'segmentation fault', 'PARSE_ERR', 'mpirun']
# cancelled = False
# with open(errfile, 'r') as f:
# lines = f.readlines()
# for line in lines:
# if 'CANCELLED' in line:
# cancelled = True
# for err in errs:
# if err in line:
# logger.debug('Need restart')
# return 3, line
# # cancelled by owner
# if len(lines) > 0:
# if cancelled:
# logger.debug('Cancelled')
# return -1, lines[0]
# no error from queue, or job not run in the queue
output = self.label + ".pwo"
if not os.path.exists(output):
# print('%s not exists'%output)
return 3, "No pwo output file"
with open(output, "r") as f:
lines = f.readlines()
if len(lines) == 0:
return 1, "pwo file has nothing"
stime = lines[1].split("starts on")[1]
nlines = len(lines)
n = min([200, nlines])
lastlines = lines[-n:-1]
for line in lastlines:
if line.rfind("too many bands are not converged") > -1:
logger.debug("Need restart")
return 1, "Reason: %s" % (line)
if line.rfind("convergence NOT achieved after") > -1:
logger.debug("Need restart: %s" % line)
return 1, "Reason: %s" % (line)
if line.rfind("Maximum CPU time exceeded") > -1:
logger.debug("Need restart")
return 2, "Reason: %s" % (line)
if line.rfind("JOB DONE.") > -1:
logger.debug("JOB DONE")
return 0, line
logger.debug("Not converged, %s" % lastlines[-1])
return 4, line
[docs] def run(self, atoms=None, restart=False):
"""
run and restart
"""
if atoms is not None:
self.atoms = atoms
icount = 1
converged = 4
meg0 = "None"
self.atoms.calc = self
if not restart:
try:
logger.debug("Run step: %s" % icount)
self.atoms.get_potential_energy()
converged, meg0 = self.read_convergence()
except Exception as e:
logger.debug("Not converge: %s" % e)
converged, meg0 = self.read_convergence()
icount += 1
if converged == 0:
converged = 4
# print(converged, xml_end, meg0)
# restart 0, 1, 2
exitfile = os.path.join(self.directory, "EXIT")
while converged > 0 or restart == 2:
if os.path.exists(exitfile):
logger.debug("Exit file exists!")
os.remove(exitfile)
sys.exit()
restart = 1
self.backup_file("%s.pwo" % self.prefix, directory=self.directory)
self.backup_file("data-file-schema.xml", directory=self.save_directory)
logger.debug("Run step: %s" % icount)
xml_end = self.check_xml_file()
if xml_end:
self.parameters["input_data"]["CONTROL"]["restart_mode"] = "restart"
else:
self.parameters["input_data"]["CONTROL"][
"restart_mode"
] = "from_scratch"
if converged == 1:
if "electron_maxstep" in self.parameters["input_data"]["ELECTRONS"]:
electron_maxstep = int(
self.parameters["input_data"]["ELECTRONS"]["electron_maxstep"]
)
else:
electron_maxstep = 100
if "mixing_beta" in self.parameters["input_data"]["ELECTRONS"]:
mixing_beta = float(
self.parameters["input_data"]["ELECTRONS"]["mixing_beta"]
)
else:
mixing_beta = 0.7
if electron_maxstep < 100:
self.parameters["input_data"]["ELECTRONS"]["electron_maxstep"] = (
electron_maxstep + 50
)
logger.debug(
"electron_maxstep change from %s to %s"
% (electron_maxstep, electron_maxstep + 50)
)
self.parameters["input_data"]["ELECTRONS"]["mixing_beta"] = (
mixing_beta / 2.0
)
logger.debug(
"mixing_beta change from %s to %s"
% (mixing_beta, mixing_beta / 2.0)
)
try:
icount += 1
atoms.get_potential_energy()
converged, meg = self.read_convergence()
except Exception as e:
logger.debug("Not converge: %s" % e)
converged, meg = self.read_convergence()
meg = str(e)
restart = 2
# if converged == 0:
converged = 4
logger.debug("%s, %s" % (converged, meg))
if meg == meg0 and converged != 2:
logger.debug("Exit! Maybe deadblock! Same error message! %s" % meg)
sys.exit()
meg0 = meg
[docs] def backup_file(self, src, directory="."):
"""
compare files
backup
"""
import filecmp
tnow = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
new_src = os.path.join(directory, "%s-%s" % (tnow, src))
src = os.path.join(directory, src)
if not os.path.exists(src):
return 0
ftype = src.split(".")[-1]
flag = True
for dst in os.listdir(directory):
dst = os.path.join(directory, dst)
if dst == src:
continue
if dst.endswith(ftype):
if filecmp.cmp(dst, src):
flag = False
# print('backup files: ', flag, src, new_src)
if flag:
shutil.copy(src, new_src)
return 0
def plot_phdos(self, fldos=None, ax=None, output=None):
""" """
import matplotlib.pyplot as plt
if fldos is None:
fldos = self.prefix
phdos = np.loadtxt(self.directory + "/%s.phdos" % fldos)
self.phdos = phdos
if ax is None:
fig, ax = plt.subplots(figsize=(6, 3))
# ax = plt.gca()
ax.plot(self.phdos[:, 0], self.phdos[:, 1], linewidth=0.7)
ax.set_xlabel("Frequency (cm^-1)")
ax.set_ylabel("DOS (a.u.)")
if output is not None:
plt.savefig("%s" % output)
return ax
def get_work_function(
self,
ax=None,
inpfile="potential.cube",
output=None,
shift=False,
):
import matplotlib.pyplot as plt
from ase.io.cube import read_cube_data
from ase.units import create_units
if ax is None:
import matplotlib.pyplot as plt
ax = plt.gca()
units = create_units("2006")
#
filename = os.path.join(self.directory, "pp", inpfile)
data, atoms = read_cube_data(filename)
data = data * units["Ry"]
ef = self.get_fermi_level()
# x, y, z, lp = calc.get_local_potential()
nx, ny, nz = data.shape
axy = np.array([np.average(data[:, :, z]) for z in range(nz)])
# setup the x-axis in realspace
uc = atoms.get_cell()
xaxis = np.linspace(0, uc[2][2], nz)
if shift:
ax.plot(xaxis, axy - ef)
ef = 0
else:
ax.plot(xaxis, axy)
ax.plot([min(xaxis), max(xaxis)], [ef, ef], "k:")
ax.set_xlabel("Position along z-axis ($\AA$)")
ax.set_ylabel("Potential (eV)")
if output:
plt.savefig("%s" % output)
# plt.show()
atoms = self.results["atoms"]
pos = max(atoms.positions[:, 2] + 3)
ind = (xaxis > pos) & (xaxis < pos + 3)
wf = np.average(axy[ind]) - ef
print("min: %s, max: %s" % (pos, pos + 3))
print("The workfunction is {0:1.2f} eV".format(wf))
return wf
def get_bader_charge(self, inpfile=None):
""" """
from ase.io.bader import attach_charges
if not inpfile:
inpfile = "%s.cube" % self.prefix
command = "bader %s" % inpfile
print(command)
try:
proc = subprocess.Popen(
command, shell=True, cwd=os.path.join(self.directory, "pp")
)
except OSError as err:
msg = 'Failed to execute "{}"'.format(command)
raise EnvironmentError(msg) from err
acf = os.path.join(self.directory, "ACF.dat")
attach_charges(self.results["atoms"], acf)
def read_time(
self,
):
""" """
pwo = os.path.join(self.directory, self.prefix + ".pwo")
with open(pwo, "r") as f:
ts = 0
lines = f.readlines()
for line in lines[::-1]:
if "PWSCF" in line and "WALL" in line:
t = line.split("CPU")[-1].split("WALL")[0]
if "s" in t:
ts = float(t.split("m")[-1].split("s")[0])
if "m" in t:
tm = float(t.split("h")[-1].split("m")[0])
ts += tm * 60
if "h" in t:
th = float(t.split("h")[0])
ts += th * 3600
break
self.results["time"] = ts
return ts
def get_time(
self,
):
t = 0
filename = os.path.join(self.directory, "%s.pwo" % self.prefix)
with open(filename) as f:
lines = f.readlines()
for line in lines[-200:]:
if "init_run :" in line or "electrons :" in line:
t += float(line.split("CPU")[1].split("s WALL")[0])
self.results["time"] = t
return t
[docs] def clean(self):
"""
remove wfc, hub files
"""
keys = [".wfc", ".hub"]
files = os.listdir(self.directory)
for file in files:
for key in keys:
if key in file:
os.remove(os.path.join(self.directory, file))
files = os.listdir(self.save_directory)
# keys = ['wfc']
# for file in files:
# for key in keys:
# if key in file:
# os.remove(os.path.join(self.save_directory, file))
def get_fermi_level(self):
if self.calc is None:
raise PropertyNotPresent(error_template % "Fermi level")
return self.calc.get_fermi_level()
def get_ibz_k_points(self):
if self.calc is None:
raise PropertyNotPresent(error_template % "IBZ k-points")
ibzkpts = self.calc.get_ibz_k_points()
if ibzkpts is None:
warnings.warn(warn_template % "IBZ k-points")
return ibzkpts
def get_k_point_weights(self):
if self.calc is None:
raise PropertyNotPresent(error_template % "K-point weights")
k_point_weights = self.calc.get_k_point_weights()
if k_point_weights is None:
warnings.warn(warn_template % "K-point weights")
return k_point_weights
def get_eigenvalues(self, **kwargs):
if self.calc is None:
raise PropertyNotPresent(error_template % "Eigenvalues")
eigenvalues = self.calc.get_eigenvalues(**kwargs)
if eigenvalues is None:
warnings.warn(warn_template % "Eigenvalues")
return eigenvalues
def get_number_of_spins(self):
if self.calc is None:
raise PropertyNotPresent(error_template % "Number of spins")
nspins = self.calc.get_number_of_spins()
if nspins is None:
warnings.warn(warn_template % "Number of spins")
return nspins