This commit is contained in:
ge
2023-08-27 23:42:56 +03:00
parent 91478b8122
commit e8133af392
17 changed files with 722 additions and 341 deletions

View File

@ -1,3 +1,5 @@
from .exceptions import *
from .ga import QemuAgent
from .main import VirtualMachine
from .guest_agent import QemuAgent
from .virtual_machine import VirtualMachine
from .installer import VirtualMachineInstaller
from .hardware import vCPUMode, vCPUTopology

View File

@ -1,22 +1,31 @@
import libvirt
from .exceptions import VMError, VMNotFound
from .exceptions import VMError
class VirtualMachineBase:
def __init__(self, session: 'LibvirtSession', name: str):
self.domname = name
self.session = session.session # virConnect object
self.config = session.config # ConfigLoader object
self.domain = self._get_domain(name)
def __init__(self, domain: libvirt.virDomain):
self.domain = domain
self.domain_name = self._get_domain_name()
self.domain_info = self._get_domain_info()
def _get_domain(self, name: str) -> libvirt.virDomain:
"""Get virDomain object by name to manipulate with domain."""
def _get_domain_name(self):
try:
domain = self.session.lookupByName(name)
if domain is not None:
return domain
raise VMNotFound(name)
return self.domain.name()
except libvirt.libvirtError as err:
raise VMError(err) from err
raise VMError(f'Cannot get domain name: {err}') from err
def _get_domain_info(self):
# https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainInfo
try:
info = self.domain.info()
return {
'state': info[0],
'max_memory': info[1],
'memory': info[2],
'nproc': info[3],
'cputime': info[4]
}
except libvirt.libvirtError as err:
raise VMError(f'Cannot get domain info: {err}') from err

View File

@ -8,7 +8,7 @@ class VMError(Exception):
class VMNotFound(Exception):
def __init__(self, domain, message='VM not found: {domain}'):
def __init__(self, domain, message='VM not found vm={domain}'):
self.domain = domain
self.message = message.format(domain=domain)
super().__init__(self.message)

View File

@ -12,8 +12,9 @@ from .exceptions import QemuAgentError
logger = logging.getLogger(__name__)
QEMU_TIMEOUT = 60 # seconds
POLL_INTERVAL = 0.3 # also seconds
# Note that if no QEMU_TIMEOUT libvirt cannot connect to agent
QEMU_TIMEOUT = 60 # in seconds
POLL_INTERVAL = 0.3 # also in seconds
class QemuAgent(VirtualMachineBase):
@ -28,12 +29,9 @@ class QemuAgent(VirtualMachineBase):
must be passed as string. Wraps execute() method.
"""
def __init__(self,
session: 'LibvirtSession',
name: str,
timeout: int | None = None,
def __init__(self, domain: libvirt.virDomain, timeout: int | None = None,
flags: int | None = None):
super().__init__(session, name)
super().__init__(domain)
self.timeout = timeout or QEMU_TIMEOUT # timeout for guest agent
self.flags = flags or libvirt_qemu.VIR_DOMAIN_QEMU_MONITOR_COMMAND_DEFAULT
@ -110,7 +108,11 @@ class QemuAgent(VirtualMachineBase):
)
def _execute(self, command: dict):
logging.debug('Execute command: vm=%s cmd=%s', self.domname, command)
logging.debug('Execute command: vm=%s cmd=%s', self.domain_name,
command)
if self.domain_info['state'] != libvirt.VIR_DOMAIN_RUNNING:
raise GuestAgentError(
f'Cannot execute command: vm={self.domain_name} is not running')
try:
return libvirt_qemu.qemuAgentCommand(
self.domain, # virDomain object
@ -119,7 +121,9 @@ class QemuAgent(VirtualMachineBase):
self.flags,
)
except libvirt.libvirtError as err:
raise QemuAgentError(err) from err
raise QemuAgentError(
f'Cannot execute command on vm={self.domain_name}: {err}'
) from err
def _get_cmd_result(
self, pid: int, decode_output: bool = False, wait: bool = True,
@ -131,7 +135,8 @@ class QemuAgent(VirtualMachineBase):
output = json.loads(self._execute(cmd))
return self._return_tuple(output, decode=decode_output)
logger.debug('Start polling command pid=%s', pid)
logger.debug('Start polling command pid=%s on vm=%s', pid,
self.domain_name)
start_time = time()
while True:
output = json.loads(self._execute(cmd))
@ -141,10 +146,12 @@ class QemuAgent(VirtualMachineBase):
now = time()
if now - start_time > timeout:
raise QemuAgentError(
f'Polling command pid={pid} took longer than {timeout} seconds.'
f'Polling command pid={pid} on vm={self.domain_name} '
f'took longer than {timeout} seconds.'
)
logger.debug('Polling command pid=%s finished, time taken: %s seconds',
pid, int(time() - start_time))
logger.debug('Polling command pid=%s on vm=%s finished, '
'time taken: %s seconds',
pid, self.domain_name, int(time() - start_time))
return self._return_tuple(output, decode=decode_output)
def _return_tuple(self, output: dict, decode: bool = False):

81
node_agent/vm/hardware.py Normal file
View File

@ -0,0 +1,81 @@
import textwrap
from enum import Enum
from collections import UserDict
import libvirt
from lxml.etree import SubElement, fromstring, tostring
class Boot(Enum):
BIOS = 'bios'
UEFI = 'uefi'
class vCPUMode(Enum):
HOST_MODEL = 'host-model'
HOST_PASSTHROUGTH = 'host-passthrougth'
CUSTOM = 'custom'
MAXIMUM = 'maximum'
class DomainCapabilities:
def __init__(self, session: libvirt.virConnect):
self.session = session
self.domcaps = fromstring(
self.session.getDomainCapabilities())
@property
def arch(self):
return self.domcaps.xpath('/domainCapabilities/arch')[0].text
@property
def virttype(self):
return self.domcaps.xpath('/domainCapabilities/domain')[0].text
@property
def emulator(self):
return self.domcaps.xpath('/domainCapabilities/path')[0].text
@property
def machine(self):
return self.domcaps.xpath('/domainCapabilities/machine')[0].text
def best_cpu(self, mode: vCPUMode) -> str:
"""
See https://libvirt.org/html/libvirt-libvirt-host.html
#virConnectBaselineHypervisorCPU
"""
cpus = self.domcaps.xpath(
f'/domainCapabilities/cpu/mode[@name="{mode}"]')[0]
cpus.tag = 'cpu'
for attr in cpus.attrib.keys():
del cpus.attrib[attr]
arch = SubElement(cpus, 'arch')
arch.text = self.arch
xmlcpus = tostring(cpus, encoding='unicode', pretty_print=True)
xml = self.session.baselineHypervisorCPU(self.emulator,
self.arch, self.machine, self.virttype, [xmlcpus])
return textwrap.indent(xml, ' ' * 2)
class vCPUTopology(UserDict):
"""
CPU topology schema ``{'sockets': 1, 'cores': 4, 'threads': 1}``::
<topology sockets='1' dies='1' cores='4' threads='1'/>
"""
def __init__(self, topology: dict):
super().__init__(self._validate(topology))
def _validate(self, topology: dict):
if isinstance(topology, dict):
if ['sockets', 'cores', 'threads'] != list(topology.keys()):
raise ValueError("Topology must have 'sockets', 'cores' "
"and 'threads' keys.")
for key in topology.keys():
if not isinstance(topology[key], int):
raise TypeError(f"Key '{key}' must be 'int'")
return topology
raise TypeError("Topology must be a 'dict'")

View File

@ -0,0 +1,94 @@
import re
import libvirt
from ..utils.xml import Constructor
from ..utils.mac import random_mac
from .hardware import DomainCapabilities, vCPUMode, vCPUTopology, Boot
class vCPUInfo:
pass
class ImageVolume:
pass
class CloudInitConfig:
pass
class BootOrder:
pass
class VirtualMachineInstaller:
def __init__(self, session: libvirt.virConnect):
self.session = session
self.info = {}
def install(
self,
name: str | None = None,
title: str | None = None,
description: str = '',
os: str | None = None,
image: ImageVolume | None = None,
volumes: list['VolumeInfo'] | None = None,
vcpus: int = 0,
vcpu_info: vCPUInfo | None = None,
vcpu_mode: vCPUMode | None = None,
vcpu_topology: vCPUTopology | None = None,
memory: int = 0,
boot: Boot = Boot.BIOS,
boot_menu: bool = False,
boot_order: BootOrder = ('cdrom', 'hd'),
cloud_init: CloudInitConfig | None = None):
"""
Install virtual machine with passed parameters.
"""
domcaps = DomainCapabilities(self.session.session)
name = self._validate_name(name)
if vcpu_topology is None:
vcpu_topology = vCPUTopology(
{'sockets': 1, 'cores': vcpus, 'threads': 1})
self._validate_topology(vcpus, vcpu_topology)
if vcpu_info is None:
if not vcpu_mode:
vcpu_mode = vCPUMode.CUSTOM.value
xml_cpu = domcaps.best_cpu(vcpu_mode)
else:
raise NotImplementedError('Custom CPU not implemented')
xml_domain = Constructor().gen_domain_xml(
name=name,
title=title if title else name,
desc=description if description else '',
vcpus=vcpus,
memory=memory,
domain_type='hvm',
machine=domcaps.machine,
arch=domcaps.arch,
boot_order=('cdrom', 'hd'),
cpu=xml_cpu,
mac=random_mac()
)
return xml_domain
def _validate_name(self, name):
if name is None:
raise ValueError("'name' cannot be empty")
if isinstance(name, str):
if not re.match(r"^[a-z0-9_]+$", name, re.I):
raise ValueError(
"'name' can contain only letters, numbers "
"and underscore.")
return name.lower()
raise TypeError(f"'name' must be 'str', not {type(name)}")
def _validate_topology(self, vcpus, topology):
sockets = topology['sockets']
cores = topology['cores']
threads = topology['threads']
if sockets * cores * threads == vcpus:
return
raise ValueError("CPU topology must match the number of 'vcpus'")
def _define(self, xml: str):
self.session.defineXML(xml)

View File

@ -13,7 +13,7 @@ class VirtualMachine(VirtualMachineBase):
@property
def name(self):
return self.domname
return self.domain_name
@property
def status(self) -> str:
@ -27,7 +27,7 @@ class VirtualMachine(VirtualMachineBase):
state = self.domain.state()[0]
except libvirt.libvirtError as err:
raise VMError(
f'Cannot fetch VM status vm={self.domname}: {err}') from err
f'Cannot fetch VM status vm={self.domain_name}: {err}') from err
STATES = {
libvirt.VIR_DOMAIN_NOSTATE: 'nostate',
libvirt.VIR_DOMAIN_RUNNING: 'running',
@ -57,20 +57,21 @@ class VirtualMachine(VirtualMachineBase):
return False
except libvirt.libvirtError as err:
raise VMError(
f'Cannot get autostart status vm={self.domname}: {err}'
f'Cannot get autostart status vm={self.domain_name}: {err}'
) from err
def start(self) -> None:
"""Start defined VM."""
logger.info('Starting VM: vm=%s', self.domname)
logger.info('Starting VM: vm=%s', self.domain_name)
if self.is_running:
logger.debug('VM vm=%s is already started, nothing to do',
self.domname)
logger.warning('VM vm=%s is already started, nothing to do',
self.domain_name)
return
try:
self.domain.create()
except libvirt.libvirtError as err:
raise VMError(f'Cannot start vm={self.domname}: {err}') from err
raise VMError(
f'Cannot start vm={self.domain_name}: {err}') from err
def shutdown(self, mode: str | None = None) -> None:
"""
@ -78,7 +79,7 @@ class VirtualMachine(VirtualMachineBase):
* GUEST_AGENT - use guest agent
* NORMAL - use method choosen by hypervisor to shutdown machine
* SIGTERM - send SIGTERM to QEMU process, destroy machine gracefully
* SIGKILL - send SIGKILL, this option may corrupt guest data!
* SIGKILL - send SIGKILL to QEMU process. May corrupt guest data!
If mode is not passed use 'NORMAL' mode.
"""
MODES = {
@ -90,7 +91,7 @@ class VirtualMachine(VirtualMachineBase):
if mode is None:
mode = 'NORMAL'
if not isinstance(mode, str):
raise ValueError(f'Mode must be a string, not {type(mode)}')
raise ValueError(f"Mode must be a 'str', not {type(mode)}")
if mode.upper() not in MODES:
raise ValueError(f"Unsupported mode: '{mode}'")
try:
@ -99,7 +100,7 @@ class VirtualMachine(VirtualMachineBase):
elif mode in ['SIGTERM', 'SIGKILL']:
self.domain.destroyFlags(flags=MODES.get(mode))
except libvirt.libvirtError as err:
raise VMError(f'Cannot shutdown vm={self.domname} with '
raise VMError(f'Cannot shutdown vm={self.domain_name} with '
f'mode={mode}: {err}') from err
def reset(self) -> None:
@ -116,16 +117,18 @@ class VirtualMachine(VirtualMachineBase):
try:
self.domain.reset()
except libvirt.libvirtError as err:
raise VMError(f'Cannot reset vm={self.domname}: {err}') from err
raise VMError(
f'Cannot reset vm={self.domain_name}: {err}') from err
def reboot(self) -> None:
"""Send ACPI signal to guest OS to reboot. OS may ignore this."""
try:
self.domain.reboot()
except libvirt.libvirtError as err:
raise VMError(f'Cannot reboot vm={self.domname}: {err}') from err
raise VMError(
f'Cannot reboot vm={self.domain_name}: {err}') from err
def autostart(self, enable: bool) -> None:
def set_autostart(self, enable: bool) -> None:
"""
Configure VM to be automatically started when the host machine boots.
"""
@ -136,13 +139,57 @@ class VirtualMachine(VirtualMachineBase):
try:
self.domain.setAutostart(autostart_flag)
except libvirt.libvirtError as err:
raise VMError(f'Cannot set autostart vm={self.domname} '
raise VMError(f'Cannot set autostart vm={self.domain_name} '
f'autostart={autostart_flag}: {err}') from err
def set_vcpus(self, count: int):
def set_vcpus(self, nvcpus: int, hotplug: bool = False):
"""
Set vCPUs for VM. If `hotplug` is True set vCPUs on running VM.
If VM is not running set `hotplug` to False. If `hotplug` is True
and VM is not currently running vCPUs will set in config and will
applied when machine boot.
NB: Note that if this call is executed before the guest has
finished booting, the guest may fail to process the change.
"""
if nvcpus == 0:
raise VMError(f'Cannot set zero vCPUs vm={self.domain_name}')
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE +
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
try:
self.domain.setVcpusFlags(nvcpus, flags=flags)
except libvirt.libvirtError as err:
raise VMError(
f'Cannot set vCPUs for vm={self.domain_name}: {err}') from err
def set_memory(self, memory: int, hotplug: bool = False):
"""
Set momory for VM. `memory` must be passed in mebibytes. Internally
converted to kibibytes. If `hotplug` is True set memory for running
VM, else set memory in config and will applied when machine boot.
If `hotplug` is True and machine is not currently running set memory
in config.
"""
if memory == 0:
raise VMError(f'Cannot set zero memory vm={self.domain_name}')
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE +
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
try:
self.domain.setVcpusFlags(memory * 1024, flags=flags)
except libvirt.libvirtError as err:
raise VMError(
f'Cannot set memory for vm={self.domain_name}: {err}') from err
def attach_device(self, device: str):
pass
def set_ram(self, count: int):
def detach_device(self, device: str):
pass
def list_ssh_keys(self, user: str):
@ -156,3 +203,11 @@ class VirtualMachine(VirtualMachineBase):
def set_user_password(self, user: str):
pass
def dump_xml(self) -> str:
return self.domain.XMLDesc()
def delete(self, delete_volumes: bool = False):
"""Undefine VM."""
self.shutdown(method='SIGTERM')
self.domain.undefine()