diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index a28b229330..d204c3f26f 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -7,23 +7,26 @@ from __future__ import absolute_import import os +import libzfs_core + +from functools import wraps from contextlib import contextmanager from uuid import uuid4 -from subprocess import ( - CalledProcessError, STDOUT, PIPE, Popen, check_call, check_output -) +from subprocess import call, check_call +from Queue import Queue from characteristic import attributes, with_cmp, with_repr from zope.interface import implementer -from eliot import Field, MessageType, Logger +from eliot import Field, MessageType, Logger, write_traceback from twisted.python.failure import Failure from twisted.python.filepath import FilePath from twisted.internet.endpoints import ProcessEndpoint, connectProtocol from twisted.internet.protocol import Protocol -from twisted.internet.defer import Deferred, succeed, gatherResults +from twisted.internet.defer import Deferred, succeed +from twisted.internet.threads import deferToThreadPool from twisted.internet.error import ConnectionDone, ProcessTerminated from twisted.application.service import Service @@ -67,26 +70,109 @@ def connectionLost(self, reason): del self._result -def zfs_command(reactor, arguments): +class _AsyncLZC(object): """ - Asynchronously run the ``zfs`` command-line tool with the given arguments. + A proxy class for the asynchronous execution using a given reactor and its + thread pool. + + Primarily this class dispatches its method calls to the functions in + :mod:`libzfs_core`. But it can also be used for the asynchronous execution + of an arbitrary function. + """ + + def __init__(self, reactor): + """ + :param reactor: the reactor that is to be used for the asynchronous + execution. + """ + self._reactor = reactor + self._cache = {} + + def callDeferred(self, func, *args, **kwargs): + """ + This is a thin wrapper around :func:`deferToThreadPool`. + + Its primary advantage is that the reactor is already associated with + an instance of :class:`_AsyncLZC` and :meth:`getThreadPool` is called + to get the reactor's thread pool. + """ + return deferToThreadPool(self._reactor, self._reactor.getThreadPool(), + func, *args, **kwargs) + + def __getattr__(self, name): + """ + Pretend that this class provides the same methods as the functions + in :mod:`libzfs_core`. The proxy methods execute the functions + in the asynchronous mode using the reactor and its thread pool. + """ + try: + return self._cache[name] + except KeyError: + func = getattr(libzfs_core, name) + + @wraps(func) + def _async_wrapper(*args, **kwargs): + return self.callDeferred(func, *args, **kwargs) + + self._cache[name] = _async_wrapper + return self._cache[name] + + +_reactor_to_alzc = {} + + +def _async_lzc(reactor): + """ + Return an instance of :class:`_AsyncLZC` for the given reactor. + + :param reactor: the reactor. + + The instance gets associated with the reactor and the same instance will + be returned for subsequent calls with the same ``reactor`` argument. + """ + try: + return _reactor_to_alzc[reactor] + except KeyError: + _reactor_to_alzc[reactor] = _AsyncLZC(reactor) + return _reactor_to_alzc[reactor] + + +def ext_command(reactor, arguments): + """ + Asynchronously run the given command-line tool with the given arguments. :param reactor: A ``IReactorProcess`` provider. - :param arguments: A ``list`` of ``bytes``, command-line arguments to - ``zfs``. + :param arguments: A ``list`` of ``bytes``, the command and command-line + arguments. :return: A :class:`Deferred` firing with the bytes of the result (on exit code 0), or errbacking with :class:`CommandFailed` or :class:`BadArguments` depending on the exit code (1 or 2). """ - endpoint = ProcessEndpoint(reactor, b"zfs", [b"zfs"] + arguments, + endpoint = ProcessEndpoint(reactor, arguments[0], arguments, os.environ) d = connectProtocol(endpoint, _AccumulatingProtocol()) d.addCallback(lambda protocol: protocol._result) return d +def zfs_command(reactor, arguments): + """ + Asynchronously run the ``zfs`` command-line tool with the given arguments. + + :param reactor: A ``IReactorProcess`` provider. + + :param arguments: A ``list`` of ``bytes``, command-line arguments to + ``zfs``. + + :return: A :class:`Deferred` firing with the bytes of the result (on + exit code 0), or errbacking with :class:`CommandFailed` or + :class:`BadArguments` depending on the exit code (1 or 2). + """ + return ext_command(reactor, [b"zfs"] + arguments) + + _ZFS_COMMAND = Field.forTypes( "zfs_command", [bytes], u"The command which was run.") _OUTPUT = Field.forTypes( @@ -100,33 +186,6 @@ def zfs_command(reactor, arguments): u"The zfs command signaled an error.") -def _sync_command_error_squashed(arguments, logger): - """ - Synchronously run a command-line tool with the given arguments. - - :param arguments: A ``list`` of ``bytes``, command-line arguments to - execute. - - :param eliot.Logger logger: The log writer to use to log errors running the - zfs command. - """ - message = None - log_arguments = b" ".join(arguments) - try: - process = Popen(arguments, stdout=PIPE, stderr=STDOUT) - output = process.stdout.read() - status = process.wait() - except Exception as e: - message = ZFS_ERROR( - zfs_command=log_arguments, output=str(e), status=1) - else: - if status: - message = ZFS_ERROR( - zfs_command=log_arguments, output=output, status=status) - if message is not None: - message.write(logger) - - @attributes(["name"]) class Snapshot(object): """ @@ -170,6 +229,8 @@ class Filesystem(object): filesystem. This will likely grow into a more sophisticiated implementation over time. """ + logger = Logger() + def __init__(self, pool, dataset, mountpoint=None, size=None, reactor=None): """ @@ -190,6 +251,7 @@ def __init__(self, pool, dataset, mountpoint=None, size=None, if reactor is None: from twisted.internet import reactor self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) def _exists(self): """ @@ -198,11 +260,7 @@ def _exists(self): :return: ``True`` if there is a filesystem with this name, ``False`` otherwise. """ - try: - check_output([b"zfs", b"list", self.name], stderr=STDOUT) - except CalledProcessError: - return False - return True + return libzfs_core.lzc_exists(self.name) def snapshots(self): if self._exists(): @@ -241,39 +299,48 @@ def reader(self, remote_snapshots=None): # I'm just using UUIDs, and hopefully requirements will become # clearer as we iterate. snapshot = b"%s@%s" % (self.name, uuid4()) - check_call([b"zfs", b"snapshot", snapshot]) + libzfs_core.lzc_snapshot([snapshot]) # Determine whether there is a shared snapshot which can be used as the # basis for an incremental send. local_snapshots = list( Snapshot(name=name) for name in - _parse_snapshots( - check_output([b"zfs"] + _list_snapshots_command(self)), - self - )) + _parse_snapshots(_do_list_snapshots(self), self) + ) if remote_snapshots is None: remote_snapshots = [] latest_common_snapshot = _latest_common_snapshot( remote_snapshots, local_snapshots) - - if latest_common_snapshot is None: - identifier = [snapshot] - else: - identifier = [ - b"-i", - u"{}@{}".format( - self.name, latest_common_snapshot.name).encode("ascii"), - snapshot, - ] - - process = Popen([b"zfs", b"send"] + identifier, stdout=PIPE) + latest_common_name = None + if latest_common_snapshot is not None: + latest_common_name = b"%s@%s" % (self.name, + latest_common_snapshot.name) + + (rfd, wfd) = os.pipe() + out = os.fdopen(rfd) + queue = Queue() + + def send_and_close(): + try: + libzfs_core.lzc_send(snapshot, latest_common_name, wfd) + except Exception as e: + message = ZFS_ERROR(zfs_command="lzc_send " + snapshot, + output=str(e), status=e.errno) + message.write(self.logger) + write_traceback(self.logger) + finally: + os.close(wfd) + queue.put(None) + + d = self._async_lzc.callDeferred(send_and_close) + d.addBoth(lambda _: None) try: - yield process.stdout + yield out finally: - process.stdout.close() - process.wait() + out.close() + queue.get() @contextmanager def writer(self): @@ -289,33 +356,56 @@ def writer(self): # a hack. When we replace this mechanism with a proper API we # should make it include that information. # - # -e means "if the stream says it is for foo/bar/baz then receive - # into baz". I don't know why self.name is also required, - # then. XXX try -d self.pool instead. XXX it works without -e w/ - # self.name too. XXX Delete this paragraph if we go ahead with just - # `-F` in the implementation. - # - # -F means force. If the stream is based on not-quite-the-latest + # If the stream is based on not-quite-the-latest # snapshot then we have to throw away all the snapshots newer than # it in order to receive the stream. To do that you have to # force. # - cmd = [b"zfs", b"receive", b"-F", self.name] + force = True else: # If the filesystem doesn't already exist then this is a complete # data stream. - cmd = [b"zfs", b"receive", self.name] - process = Popen(cmd, stdin=PIPE) - succeeded = False + force = False + + (rfd, wfd) = os.pipe() + wfile = os.fdopen(wfd, "w") + queue = Queue() + + def recv_and_close(): + try: + (header, c_header) = libzfs_core.receive_header(rfd) + # drr_toname is a full snapshot name, but we need only the part + # after '@' that we use to construct a local snapshot name. + snapname = header['drr_toname'].split('@', 1)[1] + snapname = self.name + '@' + snapname + libzfs_core.lzc_receive_with_header(snapname, rfd, c_header, + force) + success = True + except Exception as e: + success = False + message = ZFS_ERROR(zfs_command="lzc_receive " + self.name, + output=str(e), status=e.errno) + message.write(self.logger) + write_traceback(self.logger) + finally: + os.close(rfd) + queue.put(success) + + d = self._async_lzc.callDeferred(recv_and_close) + d.addBoth(lambda _: None) try: - yield process.stdin + yield wfile finally: - process.stdin.close() - succeeded = not process.wait() - if succeeded: - check_call([b"zfs", b"set", - b"mountpoint=" + self._mountpoint.path, - self.name]) + try: + wfile.close() + except: + pass + succeeded = queue.get() + if succeeded and not force: + # a new filesystem + libzfs_core.lzc_set_prop(self.name, b"mountpoint", + self._mountpoint.path) + check_call([b"zfs", b"mount", self.name]) @implementer(IFilesystemSnapshots) @@ -324,11 +414,12 @@ class ZFSSnapshots(object): def __init__(self, reactor, filesystem): self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) self._filesystem = filesystem def create(self, name): encoded_name = b"%s@%s" % (self._filesystem.name, name) - d = zfs_command(self._reactor, [b"snapshot", encoded_name]) + d = self._async_lzc.lzc_snapshot([encoded_name]) d.addCallback(lambda _: None) return d @@ -339,56 +430,43 @@ def list(self): return _list_snapshots(self._reactor, self._filesystem) -def _list_snapshots_command(filesystem): +def _do_list_snapshots(filesystem): """ - Construct a ``zfs`` command which will output the names of the snapshots of - the given filesystem. + Produce a list of snapshots of the given filesystem sorted by their + creation order. :param Filesystem filesystem: The ZFS filesystem the snapshots of which to list. - :return list: An argument list (of ``bytes``) which can be passed to - ``zfs`` to produce the desired list of snapshots. ``zfs`` is not - included as the first element. + :return list: A ``list`` of ``bytes`` corresponding to the + names of the snapshots. """ - return [ - b"list", - # Format the output without a header. - b"-H", - # Recurse to datasets beneath the named dataset. - b"-r", - # Only output datasets of type snapshot. - b"-t", b"snapshot", - # Only output the name of each dataset encountered. The name is the - # only thing we currently store in our snapshot model. - b"-o", b"name", - # Sort by the creation property. This gives us the snapshots in the - # order they were taken. - b"-s", b"creation", - # Start with this the dataset we're interested in. - filesystem.name, - ] + snaps = [] + for snap in libzfs_core.lzc_list_snaps(filesystem.name): + creation = libzfs_core.lzc_get_props(snap)[b"createtxg"] + snaps.append((snap, creation)) + return [x[0] for x in sorted(snaps, key=lambda x: x[1])] def _parse_snapshots(data, filesystem): """ - Parse the output of a ``zfs list`` command (like the one defined by - ``_list_snapshots_command`` into a ``list`` of ``bytes`` (the snapshot - names only). + Transform the list of fully qualified snapshot names to a list of + snapshot short names that are relative to the given filesystem. - :param bytes data: The output to parse. + :param bytes data: A ``list`` of ``bytes`` corresponding to the names + of the snapshots. :param Filesystem filesystem: The filesystem from which to extract snapshots. If the output includes snapshots for other filesystems (eg siblings or children) they are excluded from the result. :return list: A ``list`` of ``bytes`` corresponding to the - names of the snapshots in the output. The order of the list is the + short names of the snapshots. The order of the list is the same as the order of the snapshots in the data being parsed. """ result = [] - for line in data.splitlines(): - dataset, snapshot = line.split(b'@', 1) + for snap in data: + dataset, snapshot = snap.split(b'@', 1) if dataset == filesystem.name: result.append(snapshot) return result @@ -407,7 +485,7 @@ def _list_snapshots(reactor, filesystem): :return: A ``Deferred`` which fires with a ``list`` of ``Snapshot`` instances giving the requested snapshot information. """ - d = zfs_command(reactor, _list_snapshots_command(filesystem)) + d = _async_lzc(reactor).callDeferred(_do_list_snapshots, filesystem) d.addCallback(_parse_snapshots, filesystem) return d @@ -446,6 +524,7 @@ def __init__(self, reactor, name, mount_root): mounted. """ self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) self._name = name self._mount_root = mount_root @@ -467,18 +546,31 @@ def startService(self): # for StoragePool being an IService implementation). # https://clusterhq.atlassian.net/browse/FLOC-635 + # First, actually unmount the dataset. + # See the explanation below where 'canmount' is set to 'off'. + # At the moment all errors are ignored. + call([b"umount", self._name]) + # Set the root dataset to be read only; IService.startService # doesn't support Deferred results, and in any case startup can be # synchronous with no ill effects. - _sync_command_error_squashed( - [b"zfs", b"set", b"readonly=on", self._name], self.logger) + try: + libzfs_core.lzc_set_prop(self._name, b"readonly", 1) + except libzfs_core.exceptions.ZFSError as e: + message = ZFS_ERROR(zfs_command="set readonly=on " + self._name, + output=str(e), status=e.errno) + message.write(self.logger) # If the root dataset is read-only then it's not possible to create # mountpoints in it for its child datasets. Avoid mounting it to avoid # this problem. This should be fine since we don't ever intend to put # any actual data into the root dataset. - _sync_command_error_squashed( - [b"zfs", b"set", b"canmount=off", self._name], self.logger) + try: + libzfs_core.lzc_set_prop(self._name, b"canmount", 0) + except libzfs_core.exceptions.ZFSError as e: + message = ZFS_ERROR(zfs_command="set canmount=off" + self._name, + output=str(e), status=e.errno) + message.write(self.logger) def _check_for_out_of_space(self, reason): """ @@ -492,17 +584,15 @@ def _check_for_out_of_space(self, reason): def create(self, volume): filesystem = self.get(volume) mount_path = filesystem.get_path().path - properties = [b"-o", b"mountpoint=" + mount_path] + properties = {b"mountpoint": mount_path} if volume.locally_owned(): - properties.extend([b"-o", b"readonly=off"]) + properties[b"readonly"] = 0 if volume.size.maximum_size is not None: - properties.extend([ - b"-o", u"refquota={0}".format( - volume.size.maximum_size).encode("ascii") - ]) - d = zfs_command(self._reactor, - [b"create"] + properties + [filesystem.name]) + properties[b"refquota"] = volume.size.maximum_size + d = self._async_lzc.lzc_create(filesystem.name, props=properties) d.addErrback(self._check_for_out_of_space) + d.addCallback( + lambda _: zfs_command(self._reactor, [b"mount", filesystem.name])) d.addCallback(lambda _: filesystem) return d @@ -513,27 +603,25 @@ def destroy(self, volume): # It would be better to have snapshot destruction logic as part of # IFilesystemSnapshots, but that isn't really necessary yet. def got_snapshots(snapshots): - return gatherResults(list(zfs_command( - self._reactor, - [b"destroy", b"%s@%s" % (filesystem.name, snapshot.name)]) - for snapshot in snapshots)) + return self._async_lzc.lzc_destroy_snaps([ + b"%s@%s" % (filesystem.name, snapshot.name) + for snapshot in snapshots + ], defer=False) d.addCallback(got_snapshots) - d.addCallback(lambda _: zfs_command( - self._reactor, [b"destroy", filesystem.name])) + d.addCallback( + lambda _: ext_command(self._reactor, [b"umount", filesystem.name])) + d.addCallback( + lambda _: self._async_lzc.lzc_destroy(filesystem.name)) return d def set_maximum_size(self, volume): filesystem = self.get(volume) - properties = [] if volume.size.maximum_size is not None: - properties.extend([ - u"refquota={0}".format( - volume.size.maximum_size).encode("ascii") - ]) + requota = volume.size.maximum_size else: - properties.extend([u"refquota=none"]) - d = zfs_command(self._reactor, - [b"set"] + properties + [filesystem.name]) + # zero means no quota + requota = 0 + d = self._async_lzc.lzc_set_prop(filesystem.name, b"refquota", requota) d.addErrback(self._check_for_out_of_space) d.addCallback(lambda _: filesystem) return d @@ -544,13 +632,9 @@ def clone_to(self, parent, volume): zfs_snapshots = ZFSSnapshots(self._reactor, parent_filesystem) snapshot_name = bytes(uuid4()) d = zfs_snapshots.create(snapshot_name) - clone_command = [b"clone", - # Snapshot we're cloning from: - b"%s@%s" % (parent_filesystem.name, snapshot_name), - # New filesystem we're cloning to: - new_filesystem.name, - ] - d.addCallback(lambda _: zfs_command(self._reactor, clone_command)) + full_snap_name = b"%s@%s" % (parent_filesystem.name, snapshot_name) + d.addCallback(lambda _: self._async_lzc.lzc_clone(new_filesystem.name, + full_snap_name)) self._created(d, volume) d.addCallback(lambda _: new_filesystem) return d @@ -558,8 +642,10 @@ def clone_to(self, parent, volume): def change_owner(self, volume, new_volume): old_filesystem = self.get(volume) new_filesystem = self.get(new_volume) - d = zfs_command(self._reactor, - [b"rename", old_filesystem.name, new_filesystem.name]) + d = ext_command(self._reactor, + [b"umount", old_filesystem.name]) + d.addCallback(lambda _: self._async_lzc.lzc_rename( + old_filesystem.name, new_filesystem.name)) self._created(d, new_volume) def remounted(ignored): @@ -587,7 +673,7 @@ def _created(self, result, new_volume): new_mount_path = new_filesystem.get_path().path def creation_failed(f): - if f.check(CommandFailed): + if (f.check(libzfs_core.exceptions.FilesystemExists)): # This isn't the only reason the operation could fail. We # should figure out why and report it appropriately. # https://clusterhq.atlassian.net/browse/FLOC-199 @@ -597,16 +683,15 @@ def creation_failed(f): def exists(ignored): if new_volume.locally_owned(): - result = zfs_command(self._reactor, - [b"set", b"readonly=off", - new_filesystem.name]) + result = self._async_lzc.lzc_set_prop(new_filesystem.name, + b"readonly", 0) else: - result = zfs_command(self._reactor, - [b"inherit", b"readonly", - new_filesystem.name]) - result.addCallback(lambda _: zfs_command(self._reactor, - [b"set", b"mountpoint=" + new_mount_path, - new_filesystem.name])) + result = self._async_lzc.lzc_inherit_prop(new_filesystem.name, + b"readonly") + result.addCallback(lambda _: self._async_lzc.lzc_set_prop( + new_filesystem.name, b"mountpoint", new_mount_path)) + result.addCallback(lambda _: zfs_command( + self._reactor, [b"mount", new_filesystem.name])) return result result.addCallback(exists) @@ -617,7 +702,7 @@ def get(self, volume): self._name, dataset, mount_path, volume.size) def enumerate(self): - listing = _list_filesystems(self._reactor, self._name) + listing = self._async_lzc.callDeferred(_list_filesystems, self._name) def listed(filesystems): result = set() @@ -643,40 +728,20 @@ class _DatasetInfo(object): """ -def _list_filesystems(reactor, pool): +def _list_filesystems(pool): """Get a listing of all filesystems on a given pool. :param pool: A `flocker.volume.filesystems.interface.IStoragePool` provider. - :return: A ``Deferred`` that fires with an iterator, the elements - of which are ``tuples`` containing the name and mountpoint of each - filesystem. + :return: An iterator, the elements of which are ``tuples`` containing + the name and mountpoint of each filesystem. """ - listing = zfs_command( - reactor, - [b"list", - # Descend the hierarchy to a depth of one (ie, list the direct - # children of the pool) - b"-d", b"1", - # Omit the output header - b"-H", - # Output exact, machine-parseable values (eg 65536 instead of 64K) - b"-p", - # Output each dataset's name, mountpoint and refquota - b"-o", b"name,mountpoint,refquota", - # Look at this pool - pool]) - - def listed(output, pool): - for line in output.splitlines(): - name, mountpoint, refquota = line.split(b'\t') - name = name[len(pool) + 1:] - if name: - refquota = int(refquota.decode("ascii")) - if refquota == 0: - refquota = None - yield _DatasetInfo( - dataset=name, mountpoint=mountpoint, refquota=refquota) - - listing.addCallback(listed, pool) - return listing + for child in libzfs_core.lzc_list_children(pool): + props = libzfs_core.lzc_get_props(child) + name = child[len(pool) + 1:] + refquota = props[b"refquota"] + mountpoint = props[b"mountpoint"] + if refquota == 0: + refquota = None + yield _DatasetInfo( + dataset=name, mountpoint=mountpoint, refquota=refquota) diff --git a/flocker/volume/script.py b/flocker/volume/script.py index c667c9cf73..7d79c0865e 100644 --- a/flocker/volume/script.py +++ b/flocker/volume/script.py @@ -6,7 +6,8 @@ from twisted.python.usage import Options from twisted.python.filepath import FilePath -from twisted.internet.defer import succeed, maybeDeferred +from twisted.internet import task +from twisted.internet.defer import succeed from zope.interface import implementer @@ -240,7 +241,7 @@ def main(self, reactor, options, service): documentation. """ if options.subCommand is not None: - return maybeDeferred(options.subOptions.run, service) + return task.deferLater(reactor, 0, options.subOptions.run, service) else: return succeed(None) diff --git a/flocker/volume/test/test_filesystems_zfs.py b/flocker/volume/test/test_filesystems_zfs.py index c4b7dd6454..eee8a378f4 100644 --- a/flocker/volume/test/test_filesystems_zfs.py +++ b/flocker/volume/test/test_filesystems_zfs.py @@ -13,10 +13,7 @@ from twisted.python.failure import Failure from twisted.python.filepath import FilePath -from eliot import Logger -from eliot.testing import ( - LoggedMessage, validateLogging, assertHasMessage, - ) +from eliot.testing import LoggedMessage, assertHasMessage from ...testtools import ( FakeProcessReactor, assert_equal_comparison, assert_not_equal_comparison @@ -24,9 +21,8 @@ from ..filesystems.zfs import ( _DatasetInfo, - zfs_command, CommandFailed, BadArguments, Filesystem, ZFSSnapshots, - _sync_command_error_squashed, _latest_common_snapshot, ZFS_ERROR, - Snapshot, + zfs_command, CommandFailed, BadArguments, Filesystem, + _latest_common_snapshot, ZFS_ERROR, Snapshot, ) @@ -176,154 +172,6 @@ def no_such_executable_logged(case, logger): case.assertEqual(len(LoggedMessage.ofType(logger.messages, ZFS_ERROR)), 1) -def error_status_logged(case, logger): - """ - Validate the error logging behavior of ``_sync_command_error_squashed``. - """ - assertHasMessage(case, logger, ZFS_ERROR, { - 'status': 1, - 'zfs_command': 'python -c raise SystemExit(1)', - 'output': ''}) - case.assertEqual(len(LoggedMessage.ofType(logger.messages, ZFS_ERROR)), 1) - - -class SyncCommandTests(SynchronousTestCase): - """ - Tests for ``_sync_command_error_squashed``. - """ - @validateLogging(no_such_executable_logged) - def test_no_such_executable(self, logger): - """ - If the executable specified to ``_sync_command_error_squashed`` cannot - be found then the function nevertheless returns ``None``. - """ - result = _sync_command_error_squashed( - [b"nonsense garbage made up no such command"], - logger) - self.assertIs(None, result) - - @validateLogging(error_status_logged) - def test_error_exit(self, logger): - """ - If the child process run by ``_sync_command_error_squashed`` exits with - an an error status then the function nevertheless returns ``None``. - """ - result = _sync_command_error_squashed( - [b"python", b"-c", b"raise SystemExit(1)"], - logger) - self.assertIs(None, result) - - def test_success(self): - """ - ``_sync_command_error_squashed`` runs the given command and returns - ``None``. - """ - result = _sync_command_error_squashed( - [b"python", b"-c", b""], - Logger()) - self.assertIs(None, result) - - -class ZFSSnapshotsTests(SynchronousTestCase): - """Unit tests for ``ZFSSnapshotsTests``.""" - - def test_create(self): - """ - ``ZFSSnapshots.create()`` calls the ``zfs snapshot`` command with the - given ``bytes`` as the snapshot name. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"pool", "fs")) - snapshots.create(b"myname") - arguments = reactor.processes[0] - self.assertEqual(arguments.args, [b"zfs", b"snapshot", - b"pool/fs@myname"]) - - def test_create_no_result_yet(self): - """ - The result of ``ZFSSnapshots.create()`` is a ``Deferred`` that does not - fire if the creation is unfinished. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - d = snapshots.create(b"name") - self.assertNoResult(d) - - def test_create_result(self): - """ - The result of ``ZFSSnapshots.create()`` is a ``Deferred`` that fires - when creation has finished. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - d = snapshots.create(b"name") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), None) - - def test_list(self): - """ - ``ZFSSnapshots.list()`` calls the ``zfs list`` command with the pool - name. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - snapshots.list() - self.assertEqual(reactor.processes[0].args, - [b"zfs", b"list", b"-H", b"-r", b"-t", b"snapshot", - b"-o", b"name", b"-s", b"creation", b"mypool"]) - - def test_list_result_root_dataset(self): - """ - ``ZFSSnapshots.list`` parses out the snapshot names of the root dataset - from the results of the command. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool@name\n") - process_protocol.childDataReceived(1, b"mypool@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name", b"name2"]) - - def test_list_result_child_dataset(self): - """ - ``ZFSSnapshots.list`` parses out the snapshot names of a non-root - dataset from the results of the command. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", b"myfs")) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool/myfs@name\n") - process_protocol.childDataReceived(1, b"mypool/myfs@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name", b"name2"]) - - def test_list_result_ignores_other_pools(self): - """ - ``ZFSSnapshots.list`` skips snapshots of other pools. - - In particular, we are likely to see snapshot names of sub-pools in - the output. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool/child@name\n") - process_protocol.childDataReceived(1, b"mypool@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name2"]) - - class LatestCommonSnapshotTests(SynchronousTestCase): """ Tests for ``_latest_common_snapshot``. diff --git a/requirements.txt b/requirements.txt index a6deb77a46..4a9d88a0ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -42,6 +42,7 @@ python-keystoneclient==1.4.0 python-keystoneclient-rackspace==0.1.3 python-novaclient==2.24.1 pytz==2015.4 +pyzfs==0.2.1 PyYAML==3.10 repoze.lru==0.6 requests==2.7.0