some updates

This commit is contained in:
ge
2023-09-23 21:24:56 +03:00
parent 43033b5a0d
commit 2870708365
25 changed files with 367 additions and 453 deletions

5
computelib/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .config import ConfigLoader
from .exceptions import *
from .session import LibvirtSession
from .vm import *
from .volume import *

108
computelib/cli/vmctl.py Normal file
View File

@ -0,0 +1,108 @@
"""
Manage virtual machines.
Usage: na-vmctl [options] status <machine>
na-vmctl [options] is-running <machine>
na-vmctl [options] start <machine>
na-vmctl [options] shutdown <machine>
na-vmctl [options] set-vcpus <machine> <nvcpus>
na-vmctl [options] set-memory <machine> <memory>
na-vmctl [options] list [-a|--all]
Options:
-c, --config <file> config file [default: /etc/node-agent/config.yaml]
-l, --loglvl <lvl> logging level
-a, --all list all machines including inactive
"""
import logging
import pathlib
import sys
import libvirt
from docopt import docopt
from ..exceptions import VMError, VMNotFound
from ..session import LibvirtSession
from ..vm import VirtualMachine
logger = logging.getLogger(__name__)
levels = logging.getLevelNamesMapping()
libvirt.registerErrorHandler(lambda userdata, err: None, ctx=None)
class Color:
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
NONE = '\033[0m'
class Table:
def __init__(self, whitespace: str = '\t'):
self.__rows = []
self.__whitespace = whitespace
def header(self, columns: list):
self.__rows.insert(0, [str(col) for col in columns])
def row(self, row: list):
self.__rows.append([str(col) for col in row])
def rows(self, rows: list):
for row in rows:
self.row(row)
def print(self):
widths = [max(map(len, col)) for col in zip(*self.__rows)]
for row in self.__rows:
print(self.__whitespace.join(
(val.ljust(width) for val, width in zip(row, widths))))
def cli():
args = docopt(__doc__)
config = pathlib.Path(args['--config']) or None
loglvl = None
machine = args['<machine>']
if args['--loglvl']:
loglvl = args['--loglvl'].upper()
if loglvl in levels:
logging.basicConfig(level=levels[loglvl])
with LibvirtSession() as session:
try:
if args['list']:
table = Table()
table.header(['NAME', 'STATE', 'AUTOSTART'])
for vm_ in session.list_machines():
table.row([vm_.name, vm_.status, vm_.is_autostart])
table.print()
sys.exit()
vm = session.get_machine(machine)
if args['status']:
print(vm.status)
if args['is-running']:
if vm.is_running:
print('running')
else:
sys.exit(vm.status)
if args['start']:
vm.start()
print(f'{vm.name} started')
if args['shutdown']:
vm.shutdown('NORMAL')
except VMNotFound as nferr:
sys.exit(f'{Color.RED}VM {machine} not found.{Color.NONE}')
except VMError as vmerr:
sys.exit(f'{Color.RED}{vmerr}{Color.NONE}')
if __name__ == '__main__':
cli()

84
computelib/cli/vmexec.py Normal file
View File

@ -0,0 +1,84 @@
"""
Execute shell commands on guest via guest agent.
Usage: na-vmexec [options] <machine> <command>
Options:
-c, --config <file> config file [default: /etc/node-agent/config.yaml]
-l, --loglvl <lvl> logging level
-s, --shell <shell> guest shell [default: /bin/sh]
-t, --timeout <sec> QEMU timeout in seconds to stop polling command status [default: 60]
-p, --pid <PID> PID on guest to poll output
"""
import logging
import pathlib
import sys
import libvirt
from docopt import docopt
from ..exceptions import GuestAgentError, VMNotFound
from ..session import LibvirtSession
from ..vm import GuestAgent
logger = logging.getLogger(__name__)
levels = logging.getLevelNamesMapping()
libvirt.registerErrorHandler(lambda userdata, err: None, ctx=None)
class Color:
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
NONE = '\033[0m'
# TODO: Add STDIN support e.g.: cat something.sh | na-vmexec vmname bash
def cli():
args = docopt(__doc__)
config = pathlib.Path(args['--config']) or None
loglvl = None
machine = args['<machine>']
if args['--loglvl']:
loglvl = args['--loglvl'].upper()
if loglvl in levels:
logging.basicConfig(level=levels[loglvl])
with LibvirtSession() as session:
shell = args['--shell']
cmd = args['<command>']
try:
ga = session.get_guest_agent(machine)
exited, exitcode, stdout, stderr = ga.shellexec(
cmd, executable=shell, capture_output=True, decode_output=True,
timeout=int(args['--timeout']))
except GuestAgentError as gaerr:
errmsg = f'{Color.RED}{gaerr}{Color.NONE}'
if str(gaerr).startswith('Polling command pid='):
errmsg = (errmsg + Color.YELLOW +
'\n[NOTE: command may still running on guest '
'pid={ga.last_pid}]' + Color.NONE)
sys.exit(errmsg)
except VMNotFound as err:
sys.exit(f'{Color.RED}VM {machine} not found{Color.NONE}')
if not exited:
print(Color.YELLOW +
'[NOTE: command may still running on guest pid={ga.last_pid}]' +
Color.NONE, file=sys.stderr)
if stderr:
print(stderr.strip(), file=sys.stderr)
if stdout:
print(stdout.strip(), file=sys.stdout)
sys.exit(exitcode)
if __name__ == '__main__':
cli()

0
computelib/cli/volctl.py Normal file
View File

36
computelib/config.py Normal file
View File

@ -0,0 +1,36 @@
import os
import tomllib
from collections import UserDict
from pathlib import Path
from .exceptions import ConfigLoaderError
NODEAGENT_CONFIG_FILE = os.getenv('NODEAGENT_CONFIG_FILE')
NODEAGENT_DEFAULT_CONFIG_FILE = '/etc/node-agent/config.toml'
class ConfigLoader(UserDict):
def __init__(self, file: Path | None = None):
if file is None:
file = NODEAGENT_CONFIG_FILE or NODEAGENT_DEFAULT_CONFIG_FILE
self.file = Path(file)
super().__init__(self._load())
# todo: load deafult configuration
def _load(self) -> dict:
try:
with open(self.file, 'rb') as config:
return tomllib.load(config)
# todo: config schema validation
except tomllib.TOMLDecodeError as tomlerr:
raise ConfigLoaderError(
f'Bad TOML syntax in config file: {self.file}: {tomlerr}'
) from tomlerr
except (OSError, ValueError) as readerr:
raise ConfigLoaderError(
f'Cannot read config file: {self.file}: {readerr}') from readerr
def reload(self) -> None:
self.data = self._load()

22
computelib/exceptions.py Normal file
View File

@ -0,0 +1,22 @@
class ConfigLoaderError(Exception):
"""Bad config file syntax, unreachable file or bad config schema."""
class LibvirtSessionError(Exception):
"""Something went wrong while connecting to libvirtd."""
class VMError(Exception):
"""Something went wrong while interacting with the domain."""
class VMNotFound(VMError):
"""Virtual machine not found on node."""
class GuestAgentError(Exception):
"""Mostly QEMU Guest Agent is not responding."""
class StoragePoolError(Exception):
"""Something went wrong when operating with storage pool."""

52
computelib/session.py Normal file
View File

@ -0,0 +1,52 @@
from contextlib import AbstractContextManager
import libvirt
from .exceptions import LibvirtSessionError, VMNotFound
from .vm import GuestAgent, VirtualMachine
from .volume import StoragePool
class LibvirtSession(AbstractContextManager):
def __init__(self, uri: str = 'qemu:///system'):
try:
self.connection = libvirt.open(uri)
except libvirt.libvirtError as err:
raise LibvirtSessionError(err) from err
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
def get_machine(self, name: str) -> VirtualMachine:
try:
return VirtualMachine(self.connection.lookupByName(name))
except libvirt.libvirtError as err:
if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
raise VMNotFound(name) from err
raise LibvirtSessionError(err) from err
def list_machines(self) -> list[VirtualMachine]:
return [VirtualMachine(dom) for dom in
self.connection.listAllDomains()]
def get_guest_agent(self, name: str,
timeout: int | None = None) -> GuestAgent:
try:
return GuestAgent(self.connection.lookupByName(name), timeout)
except libvirt.libvirtError as err:
if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
raise VMNotFound(name) from err
raise LibvirtSessionError(err) from err
def get_storage_pool(self, name: str) -> StoragePool:
return StoragePool(self.connection.storagePoolLookupByName(name))
def list_storage_pools(self) -> list[StoragePool]:
return [StoragePool(p) for p in self.connection.listStoragePools()]
def close(self) -> None:
self.connection.close()

View File

@ -0,0 +1 @@
from . import mac, xml

10
computelib/utils/mac.py Normal file
View File

@ -0,0 +1,10 @@
import random
def random_mac() -> str:
"""Retrun random MAC address."""
mac = [0x00, 0x16, 0x3e,
random.randint(0x00, 0x7f),
random.randint(0x00, 0xff),
random.randint(0x00, 0xff)]
return ':'.join(map(lambda x: "%02x" % x, mac))

73
computelib/utils/xml.py Normal file
View File

@ -0,0 +1,73 @@
from lxml.etree import Element, QName, SubElement
class Constructor:
"""
The XML constructor. This class builds XML configs for libvirt.
"""
def construct_xml(self,
tag: dict,
namespace: str | None = None,
nsprefix: str | None = None,
root: Element = None) -> Element:
"""
Shortly this recursive function transforms dictonary to XML.
Return etree.Element built from dict with following structure::
{
'name': 'device', # tag name
'text': '', # optional key
'values': { # optional key, must be a dict of key-value pairs
'type': 'disk'
},
children: [] # optional key, must be a list of dicts
}
Child elements must have the same structure. Infinite `children` nesting
is allowed.
"""
use_ns = False
if isinstance(namespace, str) and isinstance(nsprefix, str):
use_ns = True
# Create element
if root is None:
if use_ns:
element = Element(QName(namespace, tag['name']),
nsmap={nsprefix: namespace})
else:
element = Element(tag['name'])
else:
if use_ns:
element = SubElement(root, QName(namespace, tag['name']))
else:
element = SubElement(root, tag['name'])
# Fill up element with content
if 'text' in tag.keys():
element.text = tag['text']
if 'values' in tag.keys():
for key in tag['values'].keys():
element.set(str(key), str(tag['values'][key]))
if 'children' in tag.keys():
for child in tag['children']:
element.append(
self.construct_xml(child,
namespace=namespace,
nsprefix=nsprefix,
root=element))
return element
def add_meta(self, xml: Element, data: dict,
namespace: str, nsprefix: str) -> None:
"""
Add metadata to domain. See:
https://libvirt.org/formatdomain.html#general-metadata
"""
metadata = metadata_old = xml.xpath('/domain/metadata')[0]
metadata.append(
self.construct_xml(
data,
namespace=namespace,
nsprefix=nsprefix,
))
xml.replace(metadata_old, metadata)

View File

@ -0,0 +1,3 @@
from .guest_agent import GuestAgent
from .installer import VirtualMachineInstaller
from .virtual_machine import VirtualMachine

30
computelib/vm/base.py Normal file
View File

@ -0,0 +1,30 @@
import libvirt
from ..exceptions import VMError
class VirtualMachineBase:
def __init__(self, domain: libvirt.virDomain):
self.domain = domain
self.domain_name = self._get_domain_name()
self.domain_info = self._get_domain_info()
def _get_domain_name(self):
try:
return self.domain.name()
except libvirt.libvirtError as err:
raise VMError(f'Cannot get domain name: {err}') from err
def _get_domain_info(self):
try:
info = self.domain.info()
return {
'state': info[0],
'max_memory': info[1],
'memory': info[2],
'nproc': info[3],
'cputime': info[4]
}
except libvirt.libvirtError as err:
raise VMError(f'Cannot get domain info: {err}') from err

View File

@ -0,0 +1,179 @@
import json
import logging
from base64 import b64decode, standard_b64encode
from time import sleep, time
import libvirt
import libvirt_qemu
from ..exceptions import GuestAgentError
from .base import VirtualMachineBase
logger = logging.getLogger(__name__)
QEMU_TIMEOUT = 60 # in seconds
POLL_INTERVAL = 0.3 # also in seconds
class GuestAgent(VirtualMachineBase):
"""
Interacting with QEMU guest agent. Methods:
execute()
Low-level method for executing QEMU command as dict. Command dict
internally converts to JSON. See method docstring for more info.
shellexec()
High-level method for executing shell commands on guest. Command
must be passed as string. Wraps execute() method.
TODO:
check() method. Ping guest agent and check supported commands.
"""
def __init__(self, domain: libvirt.virDomain, timeout: int | None = None,
flags: int | None = None):
super().__init__(domain)
self.timeout = timeout or QEMU_TIMEOUT # timeout for guest agent
self.flags = flags or libvirt_qemu.VIR_DOMAIN_QEMU_MONITOR_COMMAND_DEFAULT
self.last_pid = None
def execute(self,
command: dict,
stdin: str | None = None,
capture_output: bool = False,
decode_output: bool = False,
wait: bool = True,
timeout: int = QEMU_TIMEOUT
) -> tuple[bool | None, int | None, str | None, str | None]:
"""
Execute command on guest and return output if `capture_output` is True.
See https://wiki.qemu.org/Documentation/QMP for QEMU commands reference.
If `wait` is True poll guest command output with POLL_INTERVAL. Raise
GuestAgentError on `timeout` reached (in seconds).
Return values:
tuple(
exited: bool | None,
exitcode: int | None,
stdout: str | None,
stderr: str | None
)
stdout and stderr are base64 encoded strings or None. stderr and stdout
will be decoded if `decode_output` is True.
"""
# todo command dict schema validation
if capture_output:
command['arguments']['capture-output'] = True
if isinstance(stdin, str):
command['arguments']['input-data'] = standard_b64encode(
stdin.encode('utf-8')).decode('utf-8')
# Execute command on guest
cmd_out = self._execute(command)
if capture_output:
self.last_pid = json.loads(cmd_out)['return']['pid']
return self._get_cmd_result(
self.last_pid,
decode_output=decode_output,
wait=wait,
timeout=timeout,
)
return None, None, None, None
def shellexec(self,
command: str,
stdin: str | None = None,
executable: str = '/bin/sh',
capture_output: bool = False,
decode_output: bool = False,
wait: bool = True,
timeout: int = QEMU_TIMEOUT
) -> tuple[bool | None, int | None, str | None, str | None]:
"""
Execute command on guest with selected shell. /bin/sh by default.
Otherwise of execute() this function brings shell command as string.
"""
cmd = {
'execute': 'guest-exec',
'arguments': {
'path': executable,
'arg': ['-c', command],
}
}
return self.execute(
cmd,
stdin=stdin,
capture_output=capture_output,
decode_output=decode_output,
wait=wait,
timeout=timeout,
)
def poll_pid(self, pid: int):
# Нужно цепляться к PID и вывести результат
pass
def _execute(self, command: dict):
logging.debug('Execute command: vm=%s cmd=%s', self.domain_name,
command)
if self.domain_info['state'] != libvirt.VIR_DOMAIN_RUNNING:
raise GuestAgentError(
f'Cannot execute command: vm={self.domain_name} is not running')
try:
return libvirt_qemu.qemuAgentCommand(
self.domain, # virDomain object
json.dumps(command),
self.timeout,
self.flags,
)
except libvirt.libvirtError as err:
raise GuestAgentError(
f'Cannot execute command on vm={self.domain_name}: {err}'
) from err
def _get_cmd_result(
self, pid: int, decode_output: bool = False, wait: bool = True,
timeout: int = QEMU_TIMEOUT):
"""Get executed command result. See GuestAgent.execute() for info."""
cmd = {'execute': 'guest-exec-status', 'arguments': {'pid': pid}}
if not wait:
output = json.loads(self._execute(cmd))
return self._return_tuple(output, decode=decode_output)
logger.debug('Start polling command pid=%s on vm=%s', pid,
self.domain_name)
start_time = time()
while True:
output = json.loads(self._execute(cmd))
if output['return']['exited']:
break
sleep(POLL_INTERVAL)
now = time()
if now - start_time > timeout:
raise GuestAgentError(
f'Polling command pid={pid} on vm={self.domain_name} '
f'took longer than {timeout} seconds.'
)
logger.debug('Polling command pid=%s on vm=%s finished, '
'time taken: %s seconds',
pid, self.domain_name, int(time() - start_time))
return self._return_tuple(output, decode=decode_output)
def _return_tuple(self, output: dict, decode: bool = False):
output = output['return']
exited = output['exited']
exitcode = output['exitcode']
stdout = stderr = None
if 'out-data' in output.keys():
stdout = output['out-data']
if 'err-data' in output.keys():
stderr = output['err-data']
if decode:
stdout = b64decode(stdout).decode('utf-8') if stdout else None
stderr = b64decode(stderr).decode('utf-8') if stderr else None
return exited, exitcode, stdout, stderr

168
computelib/vm/installer.py Normal file
View File

@ -0,0 +1,168 @@
import textwrap
from dataclasses import dataclass
from enum import Enum
from lxml import etree
from lxml.builder import E
from ..utils import mac
from ..volume import DiskInfo, VolumeInfo
@dataclass
class VirtualMachineInfo:
name: str
title: str
memory: int
vcpus: int
machine: str
emulator: str
arch: str
cpu: str # CPU full XML description
mac: str
description: str = ''
boot_order: tuple = ('cdrom', 'hd')
def to_xml(self) -> str:
xml = E.domain(
E.name(self.name),
E.title(self.title),
E.description(self.description),
E.metadata(),
E.memory(str(self.memory), unit='MB'),
E.currentMemory(str(self.memory), unit='MB'),
E.vcpu(str(self.vcpus), placement='static'),
type='kvm')
os = E.os(E.type('hvm', machine=self.machine, arch=self.arch))
for dev in self.boot_order:
os.append(E.boot(dev=dev))
xml.append(os)
xml.append(E.features(E.acpi(), E.apic()))
xml.append(etree.fromstring(self.cpu))
xml.append(E.on_poweroff('destroy'))
xml.append(E.on_reboot('restart'))
xml.append(E.on_crash('restart'))
xml.append(E.pm(
E('suspend-to-mem', enabled='no'),
E('suspend-to-disk', enabled='no'))
)
devices = E.devices()
devices.append(E.emulator(self.emulator))
devices.append(E.interface(
E.source(network='default'),
E.mac(address=self.mac),
type='network'))
devices.append(E.graphics(type='vnc', port='-1', autoport='yes'))
devices.append(E.input(type='tablet', bus='usb'))
devices.append(E.channel(
E.source(mode='bind'),
E.target(type='virtio', name='org.qemu.guest_agent.0'),
E.address(type='virtio-serial', controller='0', bus='0', port='1'),
type='unix')
)
devices.append(E.console(
E.target(type='serial', port='0'),
type='pty')
)
devices.append(E.video(
E.model(type='vga', vram='16384', heads='1', primary='yes'))
)
xml.append(devices)
return etree.tostring(xml, encoding='unicode', pretty_print=True)
class CPUMode(Enum):
HOST_MODEL = 'host-model'
HOST_PASSTHROUGH = 'host-passthrough'
CUSTOM = 'custom'
MAXIMUM = 'maximum'
@classmethod
def default(cls):
return cls.HOST_PASSTHROUGH
@dataclass
class CPUTopology:
sockets: int
cores: int
threads: int
def validate(self, vcpus: int) -> None:
if self.sockets * self.cores * self.threads == vcpus:
return
raise ValueError("CPU topology must match the number of 'vcpus'")
class VirtualMachineInstaller:
def __init__(self, session: 'LibvirtSession'):
self.session = session
self.connection = session.connection # libvirt.virConnect object
self.domcaps = etree.fromstring(
self.connection.getDomainCapabilities())
self.arch = self.domcaps.xpath('/domainCapabilities/arch/text()')[0]
self.virttype = self.domcaps.xpath(
'/domainCapabilities/domain/text()')[0]
self.emulator = self.domcaps.xpath(
'/domainCapabilities/path/text()')[0]
self.machine = self.domcaps.xpath(
'/domainCapabilities/machine/text()')[0]
def install(self, data: 'VirtualMachineSchema'):
xml_cpu = self._choose_best_cpu(CPUMode.default())
xml_vm = VirtualMachineInfo(
name=data['name'],
title=data['title'],
vcpus=data['vcpus'],
memory=data['memory'],
machine=self.machine,
emulator=self.emulator,
arch=self.arch,
cpu=xml_cpu,
mac=mac.random_mac()
).to_xml()
self._define(xml_vm)
storage_pool = self.session.get_storage_pool('default')
etalon_vol = storage_pool.get_volume('bookworm.qcow2')
new_vol = VolumeInfo(
name=data['name'] +
'_disk_some_pattern.qcow2',
path=storage_pool.path +
'/' +
data['name'] +
'_disk_some_pattern.qcow2',
capacity=data['volume']['capacity'])
etalon_vol.clone(new_vol)
vm = self.session.get_machine(data['name'])
vm.attach_device(DiskInfo(path=new_vol.path, target='vda'))
vm.set_vcpus(data['vcpus'])
vm.set_memory(data['memory'])
vm.start()
vm.set_autostart(enabled=True)
def _choose_best_cpu(self, mode: CPUMode) -> str:
if mode == 'host-passthrough':
xml = '<cpu mode="host-passthrough" migratable="on"/>'
elif mode == 'maximum':
xml = '<cpu mode="maximum" migratable="on"/>'
elif mode in ['host-model', 'custom']:
cpus = self.domcaps.xpath(
f'/domainCapabilities/cpu/mode[@name="{mode}"]')[0]
cpus.tag = 'cpu'
for attr in cpus.attrib.keys():
del cpus.attrib[attr]
arch = etree.SubElement(cpus, 'arch')
arch.text = self.arch
xmlcpus = etree.tostring(
cpus, encoding='unicode', pretty_print=True)
xml = self.connection.baselineHypervisorCPU(
self.emulator, self.arch, self.machine, self.virttype, [xmlcpus])
else:
raise ValueError(
f'CPU mode must be in {[v.value for v in CPUMode]}, '
f"but passed '{mode}'")
return textwrap.indent(xml, ' ' * 2)
def _define(self, xml: str) -> None:
self.connection.defineXML(xml)

View File

@ -0,0 +1,233 @@
import logging
import libvirt
from ..exceptions import VMError
from ..volume import VolumeInfo
from .base import VirtualMachineBase
logger = logging.getLogger(__name__)
class VirtualMachine(VirtualMachineBase):
@property
def name(self):
return self.domain_name
@property
def status(self) -> str:
"""
Return VM state: 'running', 'shutoff', etc. Reference:
https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainState
"""
try:
# libvirt returns list [state: int, reason: int]
# https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainGetState
state = self.domain.state()[0]
except libvirt.libvirtError as err:
raise VMError(
f'Cannot fetch VM status vm={self.domain_name}: {err}') from err
STATES = {
libvirt.VIR_DOMAIN_NOSTATE: 'nostate',
libvirt.VIR_DOMAIN_RUNNING: 'running',
libvirt.VIR_DOMAIN_BLOCKED: 'blocked',
libvirt.VIR_DOMAIN_PAUSED: 'paused',
libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown',
libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff',
libvirt.VIR_DOMAIN_CRASHED: 'crashed',
libvirt.VIR_DOMAIN_PMSUSPENDED: 'pmsuspended',
}
return STATES.get(state)
@property
def is_running(self) -> bool:
"""Return True if VM is running, else return False."""
if self.domain.isActive() != 1:
# inactive (0) or error (-1)
return False
return True
@property
def is_autostart(self) -> bool:
"""Return True if VM autostart is enabled, else return False."""
try:
if self.domain.autostart() == 1:
return True
return False
except libvirt.libvirtError as err:
raise VMError(
f'Cannot get autostart status vm={self.domain_name}: {err}'
) from err
def start(self) -> None:
"""Start defined VM."""
logger.info('Starting VM: vm=%s', self.domain_name)
if self.is_running:
logger.warning('VM vm=%s is already started, nothing to do',
self.domain_name)
return
try:
self.domain.create()
except libvirt.libvirtError as err:
raise VMError(
f'Cannot start vm={self.domain_name}: {err}') from err
def shutdown(self, method: str | None = None) -> None:
"""
Send signal to guest OS to shutdown. Supports several modes:
* GUEST_AGENT - use guest agent
* NORMAL - use method choosen by hypervisor to shutdown machine
* SIGTERM - send SIGTERM to QEMU process, destroy machine gracefully
* SIGKILL - send SIGKILL to QEMU process. May corrupt guest data!
If mode is not passed use 'NORMAL' mode.
"""
METHODS = {
'GUEST_AGENT': libvirt.VIR_DOMAIN_SHUTDOWN_GUEST_AGENT,
'NORMAL': libvirt.VIR_DOMAIN_SHUTDOWN_DEFAULT,
'SIGTERM': libvirt.VIR_DOMAIN_DESTROY_GRACEFUL,
'SIGKILL': libvirt.VIR_DOMAIN_DESTROY_DEFAULT
}
if method is None:
method = 'NORMAL'
if not isinstance(method, str):
raise ValueError(f"Mode must be a 'str', not {type(method)}")
if method.upper() not in METHODS:
raise ValueError(f"Unsupported mode: '{method}'")
try:
if method in ['GUEST_AGENT', 'NORMAL']:
self.domain.shutdownFlags(flags=METHODS.get(method))
elif method in ['SIGTERM', 'SIGKILL']:
self.domain.destroyFlags(flags=METHODS.get(method))
except libvirt.libvirtError as err:
raise VMError(f'Cannot shutdown vm={self.domain_name} with '
f'method={method}: {err}') from err
def reset(self) -> None:
"""
Copypaste from libvirt doc:
Reset a domain immediately without any guest OS shutdown.
Reset emulates the power reset button on a machine, where all
hardware sees the RST line set and reinitializes internal state.
Note that there is a risk of data loss caused by reset without any
guest OS shutdown.
"""
try:
self.domain.reset()
except libvirt.libvirtError as err:
raise VMError(
f'Cannot reset vm={self.domain_name}: {err}') from err
def reboot(self) -> None:
"""Send ACPI signal to guest OS to reboot. OS may ignore this."""
try:
self.domain.reboot()
except libvirt.libvirtError as err:
raise VMError(
f'Cannot reboot vm={self.domain_name}: {err}') from err
def set_autostart(self, enable: bool) -> None:
"""
Configure VM to be automatically started when the host machine boots.
"""
if enable:
autostart_flag = 1
else:
autostart_flag = 0
try:
self.domain.setAutostart(autostart_flag)
except libvirt.libvirtError as err:
raise VMError(f'Cannot set autostart vm={self.domain_name} '
f'autostart={autostart_flag}: {err}') from err
def set_vcpus(self, nvcpus: int, hotplug: bool = False):
"""
Set vCPUs for VM. If `hotplug` is True set vCPUs on running VM.
If VM is not running set `hotplug` to False. If `hotplug` is True
and VM is not currently running vCPUs will set in config and will
applied when machine boot.
NB: Note that if this call is executed before the guest has
finished booting, the guest may fail to process the change.
"""
if nvcpus == 0:
raise VMError(f'Cannot set zero vCPUs vm={self.domain_name}')
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE |
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
try:
self.domain.setVcpusFlags(nvcpus, flags=flags)
except libvirt.libvirtError as err:
raise VMError(
f'Cannot set vCPUs for vm={self.domain_name}: {err}') from err
def set_memory(self, memory: int, hotplug: bool = False):
"""
Set momory for VM. `memory` must be passed in mebibytes. Internally
converted to kibibytes. If `hotplug` is True set memory for running
VM, else set memory in config and will applied when machine boot.
If `hotplug` is True and machine is not currently running set memory
in config.
"""
if memory == 0:
raise VMError(f'Cannot set zero memory vm={self.domain_name}')
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE |
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
try:
self.domain.setMemoryFlags(memory * 1024,
libvirt.VIR_DOMAIN_MEM_MAXIMUM)
self.domain.setMemoryFlags(memory * 1024, flags=flags)
except libvirt.libvirtError as err:
raise VMError(
f'Cannot set memory for vm={self.domain_name} {memory=}: {err}') from err
def attach_device(self, device_info: 'DeviceInfo', hotplug: bool = False):
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE |
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
self.domain.attachDeviceFlags(device_info.to_xml(), flags=flags)
def detach_device(self, device_info: 'DeviceInfo', hotplug: bool = False):
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE |
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
self.domain.detachDeviceFlags(device_info.to_xml(), flags=flags)
def resize_volume(self, vol_info: VolumeInfo, online: bool = False):
# Этот метод должен принимать описание волюма и в зависимости от
# флага online вызывать virStorageVolResize или virDomainBlockResize
# https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainBlockResize
pass
def list_ssh_keys(self, user: str):
pass
def set_ssh_keys(self, user: str):
pass
def remove_ssh_keys(self, user: str):
pass
def set_user_password(self, user: str, password: str) -> None:
self.domain.setUserPassword(user, password)
def dump_xml(self) -> str:
return self.domain.XMLDesc()
def delete(self, delete_volumes: bool = False) -> None:
"""Undefine VM."""
self.shutdown(method='SIGTERM')
self.domain.undefine()
# todo: delete local volumes

View File

@ -0,0 +1,2 @@
from .storage_pool import StoragePool
from .volume import DiskInfo, Volume, VolumeInfo

View File

@ -0,0 +1,70 @@
import logging
from collections import namedtuple
import libvirt
from lxml import etree
from ..exceptions import StoragePoolError
from .volume import Volume, VolumeInfo
logger = logging.getLogger(__name__)
class StoragePool:
def __init__(self, pool: libvirt.virStoragePool):
self.pool = pool
@property
def name(self) -> str:
return self.pool.name()
@property
def path(self) -> str:
xml = etree.fromstring(self.pool.XMLDesc())
return xml.xpath('/pool/target/path/text()')[0]
@property
def usage(self) -> 'StoragePoolUsage':
xml = etree.fromstring(self.pool.XMLDesc())
StoragePoolUsage = namedtuple('StoagePoolUsage',
['capacity', 'allocation', 'available'])
return StoragePoolUsage(
capacity=int(xml.xpath('/pool/capacity/text()')[0])
allocation=int(xml.xpath('/pool/allocation/text()')[0])
available=int(xml.xpath('/pool/available/text()')[0]))
def dump_xml(self) -> str:
return self.pool.XMLDesc()
def refresh(self) -> None:
self.pool.refresh()
def create_volume(self, vol_info: VolumeInfo) -> Volume:
"""
Create storage volume and return Volume instance.
"""
logger.info('Create storage volume vol=%s in pool=%s',
vol_info.name, self.pool)
vol = self.pool.createXML(
vol_info.to_xml(),
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA)
return Volume(self.pool, vol)
def get_volume(self, name: str) -> Volume | None:
"""Lookup and return Volume instance or None."""
logger.info('Lookup for storage volume vol=%s in pool=%s',
name, self.pool.name)
try:
vol = self.pool.storageVolLookupByName(name)
return Volume(self.pool, vol)
except libvirt.libvirtError as err:
if (err.get_error_domain() == libvirt.VIR_FROM_STORAGE or
err.get_error_code() == libvirt.VIR_ERR_NO_STORAGE_VOL):
logger.error(err.get_error_message())
return None
logger.error('libvirt error: %s' err)
raise StoragePoolError(f'libvirt error: {err}') from err
def list_volumes(self) -> list[Volume]:
return [Volume(self.pool, vol) for vol in self.pool.listAllVolumes()]

View File

@ -0,0 +1,80 @@
from dataclasses import dataclass
from time import time
import libvirt
from lxml import etree
from lxml.builder import E
@dataclass
class VolumeInfo:
name: str
path: str
capacity: int
def to_xml(self) -> str:
unixtime = str(int(time()))
xml = E.volume(type='file')
xml.append(E.name(self.name))
xml.append(E.key(self.path))
xml.append(E.source())
xml.append(E.capacity(str(self.capacity * 1024 * 1024), unit='bytes'))
xml.append(E.allocation('0'))
xml.append(E.target(
E.path(self.path),
E.format(type='qcow2'),
E.timestamps(
E.atime(unixtime),
E.mtime(unixtime),
E.ctime(unixtime)),
E.compat('1.1'),
E.features(E.lazy_refcounts())
))
return etree.tostring(xml, encoding='unicode', pretty_print=True)
@dataclass
class DiskInfo:
target: str
path: str
readonly: bool = False
def to_xml(self) -> str:
xml = E.disk(type='file', device='disk')
xml.append(E.driver(name='qemu', type='qcow2', cache='writethrough'))
xml.append(E.source(file=self.path))
xml.append(E.target(dev=self.target, bus='virtio'))
if self.readonly:
xml.append(E.readonly())
return etree.tostring(xml, encoding='unicode', pretty_print=True)
class Volume:
def __init__(self, pool: libvirt.virStoragePool,
vol: libvirt.virStorageVol):
self.pool = pool
self.vol = vol
@property
def name(self) -> str:
return self.vol.name()
@property
def path(self) -> str:
return self.vol.path()
def dump_xml(self) -> str:
return self.vol.XMLDesc()
def clone(self, vol_info: VolumeInfo) -> None:
self.pool.createXMLFrom(
vol_info.to_xml(),
self.vol,
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA)
def resize(self, capacity: int):
"""Resize volume to `capacity`. Unit is mebibyte."""
self.vol.resize(capacity * 1024 * 1024)
def delete(self) -> None:
self.vol.delete()