diff --git a/Modules/Ensemble.py b/Modules/Ensemble.py index 7c8f8c38..9cda743f 100644 --- a/Modules/Ensemble.py +++ b/Modules/Ensemble.py @@ -76,34 +76,30 @@ __EPSILON__ = 1e-6 __A_TO_BOHR__ = 1.889725989 -__JULIA_EXT__ = False +# The Julia runtime is booted lazily by JuliaExt at the first actual use +# (e.g. the first fourier gradient evaluation), so that importing +# sscha.Ensemble stays fast. +import sscha.JuliaExt as JuliaExt + + +class _LazyJuliaModule(object): + """Lazy stand-in for the old "import julia" module. + + Accessing the .Main attribute initializes the Julia runtime (and includes + fourier_gradient.jl) on first use. All the julia.Main.xxx(...) call sites + below keep working unchanged with both the juliacall and pyjulia backends. + """ + @property + def Main(self): + return JuliaExt.get_main() + + +julia = _LazyJuliaModule() + +# Deprecated alias kept for backward compatibility: it only tells whether a +# Julia backend is installed, the runtime is not initialized at import time. +__JULIA_EXT__ = JuliaExt.available() __JULIA_ERROR__ = "" -try: - import julia, julia.Main - julia.Main.include(os.path.join(os.path.dirname(__file__), - "fourier_gradient.jl")) - __JULIA_EXT__ = True -except: - try: - import julia - try: - from julia.api import Julia - jl = Julia(compiled_modules=False) - import julia.Main - julia.Main.include(os.path.join(os.path.dirname(__file__), - "fourier_gradient.jl")) - __JULIA_EXT__ = True - except: - # Install the required modules - julia.install() - try: - julia.Main.include(os.path.join(os.path.dirname(__file__), - "fourier_gradient.jl")) - __JULIA_EXT__ = True - except Exception as e: - warnings.warn("Julia extension not available.\nError: {}".format(e)) - except Exception as e: - warnings.warn("Julia extension not available.\nError: {}".format(e)) try: diff --git a/Modules/JuliaExt.py b/Modules/JuliaExt.py new file mode 100644 index 00000000..70a4e5fe --- /dev/null +++ b/Modules/JuliaExt.py @@ -0,0 +1,260 @@ +""" +Lazy bridge to the Julia runtime. + +This module is the single entry point for all the Julia calls of python-sscha. +The Julia runtime is NOT booted when this module (or sscha) is imported; +it is initialized on the first call to :func:`get_main`, so that users who do +not use ``fourier gradient`` never pay the startup cost. + +Two backends are supported: + +* ``juliacall`` (PythonCall.jl) — the default. It has no libpython coupling + (works with any Python interpreter, no ``python-jl`` needed) and installs + Julia automatically on a fresh machine through ``juliapkg``. +* ``pyjulia`` (PyCall.jl) — legacy. It is used only if juliacall is not + installed, or if another package (e.g. an old python-sscha) has already + booted the PyJulia runtime in this process: only one Julia runtime can + exist per process, so in that case we must reuse it. + +The backend can be forced with the environment variable +``SSCHA_JULIA_BACKEND`` set to ``juliacall``, ``pyjulia`` or ``none``. + +The object returned by :func:`get_main` mimics ``julia.Main`` from PyJulia: +attribute access gives callables, ``eval`` evaluates a code string and +``include`` loads a file. Under juliacall, numpy array arguments are +converted to native Julia ``Array``s (PyJulia semantics), because the +sscha Julia kernels use strictly-typed signatures that do not dispatch +on the no-copy ``PyArray`` wrappers, and array results are converted +back to numpy. +""" + +import importlib.util +import os +import sys +import threading + +import numpy as np + +# The Julia source files defining the sscha kernels, included at first use. +_JL_FILES = ["fourier_gradient.jl"] + +_BACKEND_ENV = "SSCHA_JULIA_BACKEND" + +_lock = threading.RLock() +_main = None +_init_error = None + + +class JuliaError(ImportError): + pass + + +def _requested_backend(): + backend = os.environ.get(_BACKEND_ENV, "").strip().lower() + if backend in ("juliacall", "pyjulia", "none"): + return backend + return "" + + +def available(): + """Check if a Julia backend is installed, WITHOUT booting the runtime. + + This is a cheap check (importlib.find_spec). The runtime may still fail + to initialize at first use (e.g. broken installation); in that case + :func:`get_main` raises a JuliaError with the details. + """ + if _main is not None: + return True + if _init_error is not None: + return False + + backend = _requested_backend() + if backend == "none": + return False + if backend == "juliacall": + return importlib.util.find_spec("juliacall") is not None + if backend == "pyjulia": + return importlib.util.find_spec("julia") is not None + + if "julia.Main" in sys.modules: + return True + return (importlib.util.find_spec("juliacall") is not None + or importlib.util.find_spec("julia") is not None) + + +def get_main(): + """Return the (lazily initialized) Julia Main proxy. + + The first call boots the Julia runtime and includes the sscha Julia + sources; subsequent calls return the cached proxy. Raises JuliaError if + no working backend is available. + """ + global _main, _init_error + + if _main is not None: + return _main + + with _lock: + if _main is not None: + return _main + if _init_error is not None: + raise JuliaError( + "The Julia extension failed to initialize earlier:\n{}".format( + _init_error)) + try: + _main = _initialize() + except Exception as e: + _init_error = "{}: {}".format(type(e).__name__, e) + raise JuliaError( + "Could not initialize the Julia extension.\n" + "Install it with: pip install juliacall\n" + "(Julia itself is downloaded automatically at first use.)\n" + "Original error: {}".format(_init_error)) + return _main + + +def _initialize(): + backend = _requested_backend() + if backend == "none": + raise JuliaError( + "The Julia extension is disabled ({}=none).".format(_BACKEND_ENV)) + + if not backend: + if "julia.Main" in sys.modules: + # PyJulia is already running in this process (e.g. booted by + # python-sscha); a second runtime cannot be created, reuse it. + backend = "pyjulia" + elif importlib.util.find_spec("juliacall") is not None: + backend = "juliacall" + elif importlib.util.find_spec("julia") is not None: + backend = "pyjulia" + else: + raise JuliaError("Neither juliacall nor pyjulia is installed.") + + if backend == "juliacall": + main = _init_juliacall() + else: + main = _init_pyjulia() + + dirname = os.path.dirname(os.path.abspath(__file__)) + for fname in _JL_FILES: + main.include(os.path.join(dirname, fname)) + return main + + +def _init_juliacall(): + # These must be set before the first "import juliacall". + # PyJulia honored JULIA_NUM_THREADS and defaulted to a single thread: + # keep exactly that behavior (some kernels use shared buffers inside + # Threads.@threads loops and are NOT safe with more than one thread). + n_threads = os.environ.get("JULIA_NUM_THREADS", "1") + os.environ.setdefault("PYTHON_JULIACALL_THREADS", n_threads) + if os.environ.get("PYTHON_JULIACALL_THREADS", "1") != "1": + # Required for safe Julia multithreading from Python. + os.environ.setdefault("PYTHON_JULIACALL_HANDLE_SIGNALS", "yes") + + from juliacall import Main, convert + return _JuliaCallMain(Main, convert) + + +def _init_pyjulia(): + import julia + + if "julia.Main" not in sys.modules: + try: + import julia.Main + except Exception: + # Statically linked python or libpython mismatch: PyCall cannot + # use its precompiled cache. This path is slow (it recompiles + # PyCall at every launch): prefer installing juliacall. + from julia.api import Julia + Julia(compiled_modules=False) + import julia.Main + + return _PyJuliaMain(sys.modules["julia.Main"]) + + +class _PyJuliaMain(object): + """PyJulia passthrough: julia.Main already speaks numpy.""" + + def __init__(self, main): + self._main = main + + def include(self, path): + return self._main.include(path) + + def eval(self, code): + return self._main.eval(code) + + def __getattr__(self, name): + return getattr(self._main, name) + + +# numpy dtype -> Julia element type, for the nested Vector{Vector{T}} case +_JL_ELTYPE = { + "float32": "Float32", + "float64": "Float64", + "int32": "Int32", + "int64": "Int64", + "complex64": "ComplexF32", + "complex128": "ComplexF64", + "bool": "Bool", +} + + +class _JuliaCallMain(object): + """juliacall proxy restoring PyJulia argument/return conventions.""" + + def __init__(self, main, convert): + # Avoid __getattr__ recursion: set everything through __dict__. + self.__dict__["_main"] = main + self.__dict__["_convert"] = convert + self.__dict__["_array_type"] = main.seval("Array") + self.__dict__["_nested_types"] = {} + + def include(self, path): + return self._main.include(path) + + def eval(self, code): + return self._from_julia(self._main.seval(code)) + + def __getattr__(self, name): + func = getattr(self._main, name) + + def _call(*args, **kwargs): + jl_args = [self._to_julia(a) for a in args] + jl_kwargs = {k: self._to_julia(v) for k, v in kwargs.items()} + return self._from_julia(func(*jl_args, **jl_kwargs)) + + _call.__name__ = name + return _call + + def _to_julia(self, x): + if isinstance(x, np.ndarray): + return self._convert(self._array_type, x) + + # Lists/tuples of 1d arrays with a common dtype are what the + # sparse-symmetry initializers expect as Vector{Vector{T}}. + if (isinstance(x, (list, tuple)) and len(x) > 0 + and all(isinstance(e, np.ndarray) and e.ndim == 1 for e in x)): + eltype = _JL_ELTYPE.get(x[0].dtype.name) + if eltype is not None and all(e.dtype == x[0].dtype for e in x): + nested = self._nested_types.get(eltype) + if nested is None: + nested = self._main.seval( + "Vector{{Vector{{{}}}}}".format(eltype)) + self._nested_types[eltype] = nested + return self._convert(nested, list(x)) + + return x + + def _from_julia(self, x): + if isinstance(x, tuple): + return tuple(self._from_julia(e) for e in x) + + import juliacall + if isinstance(x, juliacall.ArrayValue): + # Buffer-protocol view on the Julia data; numpy keeps the + # wrapper alive through ndarray.base. + return np.asarray(x) + return x diff --git a/Modules/Parallel.py b/Modules/Parallel.py index 3200c716..5f340820 100644 --- a/Modules/Parallel.py +++ b/Modules/Parallel.py @@ -4,8 +4,10 @@ modules. """ -import numpy as np +import numpy as np import time +import os +import sys # Supports both pypar and mpi4py __PYPAR__ = False @@ -46,14 +48,53 @@ def am_i_the_master(): else: return True -def pprint(*argv): +def _force_stdout_blocking(): + """ + Make sure the standard output is in blocking mode. + + MPI launchers (mpirun, srun, ...) frequently attach the standard output + of the processes to a pipe that is opened in *non-blocking* mode. When a + large message fills the pipe buffer, a write on a non-blocking descriptor + raises ``BlockingIOError`` ([Errno 11]) instead of waiting for the buffer + to drain. This used to crash a parallel minimization while printing the + table of imaginary frequencies (see issue #196). + + Restoring the blocking mode makes the write wait for the reader to consume + the buffer, which is the expected behaviour for log output. + """ + if not hasattr(os, "set_blocking"): + return + try: + fd = sys.stdout.fileno() + except (AttributeError, OSError, ValueError): + # stdout has been replaced by an object without a real descriptor + return + try: + os.set_blocking(fd, True) + except (OSError, ValueError): + pass + + +def pprint(*argv, **kwargs): """ PARALLEL PRINTING ================= - This will print on stdout only once in parallel execution of the code + This will print on stdout only once in parallel execution of the code. + + It is robust against a standard output opened in non-blocking mode, which + is a frequent source of ``BlockingIOError`` when running under MPI (see + issue #196): a log line must never abort a running calculation. """ #print("pypar:", __PYPAR__) #print("mpi4py:", __MPI4PY__) - if am_i_the_master(): - print(*argv) + if not am_i_the_master(): + return + + _force_stdout_blocking() + try: + print(*argv, **kwargs) + except BlockingIOError: + # The blocking mode could not be enforced (e.g. os.set_blocking is + # not available or failed). Never let a print crash the calculation. + pass diff --git a/Modules/juliapkg.json b/Modules/juliapkg.json new file mode 100644 index 00000000..e25e2598 --- /dev/null +++ b/Modules/juliapkg.json @@ -0,0 +1,3 @@ +{ + "julia": "^1.10" +} diff --git a/meson.build b/meson.build index 4a35e93e..85fe451a 100644 --- a/meson.build +++ b/meson.build @@ -210,6 +210,7 @@ py.install_sources([ 'Modules/Dynamical.py', 'Modules/Ensemble.py', # 'Modules/fourier_gradient.jl', + 'Modules/JuliaExt.py', 'Modules/LocalCluster.py', 'Modules/Minimizer.py', 'Modules/Optimizer.py', @@ -241,7 +242,7 @@ py.install_sources([ # Create a 'sscha' subdirectory within the Python installation directory # and copy the .jl files there. install_data( - ['Modules/fourier_gradient.jl'], # List the .jl files you need + ['Modules/fourier_gradient.jl', 'Modules/juliapkg.json'], # List the .jl files you need install_dir : py.get_install_dir() / 'sscha' ) # If there are many .jl files in multiple subdirectories, diff --git a/pyproject.toml b/pyproject.toml index cd563b50..25a7bea5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,11 @@ dependencies = [ # entries = { scripts = ["scripts/sscha", "scripts/cluster_check.x", ...] } # However, Meson is better at handling scripts like install_data +[project.optional-dependencies] +# Fast Julia-accelerated fourier gradients. juliacall installs the Julia +# runtime automatically at first use, no further setup is required. +julia = ["juliacall"] + [project.scripts] sscha-plot-data="sscha.cli:plot_data" sscha="sscha.cli:main" diff --git a/tests/test_pprint_blocking/test_pprint_blocking.py b/tests/test_pprint_blocking/test_pprint_blocking.py new file mode 100644 index 00000000..7755ddd1 --- /dev/null +++ b/tests/test_pprint_blocking/test_pprint_blocking.py @@ -0,0 +1,122 @@ +""" +Regression tests for issue #196. + +When a SSCHA minimization runs in parallel under an MPI launcher +(``mpirun``/``srun``), the standard output is frequently a pipe opened in +*non-blocking* mode. A large write (for instance the table of imaginary +frequencies printed by ``SchaMinimizer.check_imaginary_frequencies``) fills +the pipe buffer and a write on a non-blocking descriptor raises +``BlockingIOError`` ([Errno 11]) instead of waiting for the buffer to drain. +This used to abort the whole calculation. + +The fix lives in ``sscha.Parallel.pprint`` (the function aliased as ``print`` +across the package): it restores the blocking mode of stdout and, as a last +resort, never lets a log line crash the run. +""" +import os +import sys +import threading + +import pytest + +import sscha.Parallel + +# Non-blocking pipe semantics and os.set_blocking are POSIX-only. +pytestmark = pytest.mark.skipif( + sys.platform.startswith("win") or not hasattr(os, "set_blocking"), + reason="requires POSIX non-blocking file descriptors (os.set_blocking)", +) + +# Much larger than any pipe buffer (~64 KiB) or stdio buffer, so that the +# write cannot complete in a single non-blocking shot. +BIG_MESSAGE = "x" * (4 * 1024 * 1024) + + +def _drain(read_fd, sink=None): + """Consume a pipe until EOF, optionally collecting the bytes.""" + while True: + chunk = os.read(read_fd, 1 << 16) + if not chunk: + break + if sink is not None: + sink.extend(chunk) + + +def test_builtin_print_raises_on_nonblocking_stdout(): + """Reproduce the original failure: a plain ``print`` on a non-blocking + stdout raises ``BlockingIOError`` once the buffer fills up. This is exactly + what ``pprint`` used to do before the fix.""" + read_fd, write_fd = os.pipe() + os.set_blocking(write_fd, False) + stdout = os.fdopen(write_fd, "w") + saved = sys.stdout + sys.stdout = stdout + try: + with pytest.raises(BlockingIOError): + print(BIG_MESSAGE) # builtin print, no reader draining the pipe + stdout.flush() + finally: + sys.stdout = saved + # Drain the leftover buffered bytes so closing does not block/raise. + os.set_blocking(write_fd, True) + drainer = threading.Thread(target=_drain, args=(read_fd,)) + drainer.start() + try: + stdout.close() + except OSError: + pass + drainer.join() + os.close(read_fd) + + +def test_pprint_survives_nonblocking_stdout(): + """With the fix, ``pprint`` restores blocking mode and the large write + completes successfully instead of raising.""" + read_fd, write_fd = os.pipe() + os.set_blocking(write_fd, False) + + received = bytearray() + reader = threading.Thread(target=_drain, args=(read_fd, received)) + reader.start() + + stdout = os.fdopen(write_fd, "w") + saved = sys.stdout + sys.stdout = stdout + try: + # Must not raise BlockingIOError. + sscha.Parallel.pprint(BIG_MESSAGE) + stdout.flush() + finally: + sys.stdout = saved + stdout.close() + reader.join() + os.close(read_fd) + + assert BIG_MESSAGE.encode() in bytes(received) + + +def test_pprint_never_raises_when_blocking_cannot_be_set(monkeypatch): + """Safety net: even if blocking mode cannot be enforced, ``pprint`` must + swallow the error rather than abort the calculation.""" + monkeypatch.setattr(sscha.Parallel, "_force_stdout_blocking", lambda: None) + + read_fd, write_fd = os.pipe() + os.set_blocking(write_fd, False) + stdout = os.fdopen(write_fd, "w") + saved = sys.stdout + sys.stdout = stdout + try: + # stdout stays non-blocking and nobody reads: the internal write + # raises BlockingIOError, which pprint must catch. + sscha.Parallel.pprint(BIG_MESSAGE) + finally: + sys.stdout = saved + os.set_blocking(write_fd, True) + drainer = threading.Thread(target=_drain, args=(read_fd,)) + drainer.start() + try: + stdout.close() + except OSError: + pass + drainer.join() + os.close(read_fd)