This commit is contained in:
ge 2023-08-31 20:37:41 +03:00
parent e8133af392
commit 118c4c376a
18 changed files with 409 additions and 437 deletions

View File

@ -4,7 +4,7 @@ Manage virtual machines.
Usage: na-vmctl [options] status <machine>
na-vmctl [options] is-running <machine>
na-vmctl [options] start <machine>
na-vmctl [options] shutdown <machine> [-f|--force] [-9|--sigkill]
na-vmctl [options] shutdown <machine>
na-vmctl [options] set-vcpus <machine> <nvcpus>
na-vmctl [options] set-memory <machine> <memory>
na-vmctl [options] list [-a|--all]
@ -31,6 +31,9 @@ from ..vm import VirtualMachine, VMError, VMNotFound
logger = logging.getLogger(__name__)
levels = logging.getLevelNamesMapping()
# Supress libvirt errors
libvirt.registerErrorHandler(lambda userdata, err: None, ctx=None)
class Color:
RED = '\033[31m'
@ -89,19 +92,17 @@ def cli():
if loglvl in levels:
logging.basicConfig(level=levels[loglvl])
with LibvirtSession(config) as session:
with LibvirtSession() as session:
try:
if args['list']:
vms = session.list_domains()
table = Table()
table.header(['NAME', 'STATE', 'AUTOSTART'])
for vm_ in vms:
vm_ = VirtualMachine(vm_)
for vm_ in session.list_machines():
table.row([vm_.name, vm_.status, vm_.is_autostart])
table.print()
sys.exit()
vm = VirtualMachine(session, machine)
vm = session.get_machine(machine)
if args['status']:
print(vm.status)
if args['is-running']:
@ -113,7 +114,7 @@ def cli():
vm.start()
print(f'{vm.name} started')
if args['shutdown']:
vm.shutdown(force=args['--force'], sigkill=args['sigkill'])
vm.shutdown('NORMAL')
except VMNotFound as nferr:
sys.exit(f'{Color.RED}VM {machine} not found.{Color.NONE}')
except VMError as vmerr:

View File

@ -14,15 +14,19 @@ import logging
import pathlib
import sys
import libvirt
from docopt import docopt
from ..session import LibvirtSession
from ..vm import QemuAgent, QemuAgentError, VMNotFound
from ..vm import GuestAgent, GuestAgentError, VMNotFound
logger = logging.getLogger(__name__)
levels = logging.getLevelNamesMapping()
# Supress libvirt errors
libvirt.registerErrorHandler(lambda userdata, err: None, ctx=None)
class Color:
RED = '\033[31m'
@ -45,16 +49,16 @@ def cli():
if loglvl in levels:
logging.basicConfig(level=levels[loglvl])
with LibvirtSession(config) as session:
with LibvirtSession() as session:
shell = args['--shell']
cmd = args['<command>']
try:
ga = QemuAgent(session, machine)
ga = session.get_guest_agent(machine)
exited, exitcode, stdout, stderr = ga.shellexec(
cmd, executable=shell, capture_output=True, decode_output=True,
timeout=int(args['--timeout']))
except QemuAgentError as qemuerr:
except GuestAgentError as qemuerr:
errmsg = f'{Color.RED}{qemuerr}{Color.NONE}'
if str(qemuerr).startswith('Polling command pid='):
errmsg = (errmsg + Color.YELLOW +

View File

@ -21,7 +21,7 @@ class ConfigLoader(UserDict):
super().__init__(self._load())
# todo: load deafult configuration
def _load(self):
def _load(self) -> dict:
try:
with open(self.file, 'rb') as config:
return tomllib.load(config)
@ -34,5 +34,5 @@ class ConfigLoader(UserDict):
raise ConfigLoaderError(
f'Cannot read config file: {self.file}: {readerr}') from readerr
def reload(self):
def reload(self) -> None:
self.data = self._load()

View File

@ -1,9 +1,9 @@
from contextlib import AbstractContextManager
from pathlib import Path
import libvirt
from .config import ConfigLoader
from .vm import GuestAgent, VirtualMachine, VMNotFound
from .volume import StoragePool
class LibvirtSessionError(Exception):
@ -12,9 +12,8 @@ class LibvirtSessionError(Exception):
class LibvirtSession(AbstractContextManager):
def __init__(self, config: Path | None = None):
self.config = ConfigLoader(config)
self.session = self._connect(self.config['libvirt']['uri'])
def __init__(self, uri: str = 'qemu:///system'):
self.connection = self._connect(uri)
def __enter__(self):
return self
@ -22,30 +21,48 @@ class LibvirtSession(AbstractContextManager):
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
def _connect(self, connection_uri: str):
def _connect(self, connection_uri: str) -> libvirt.virConnect:
try:
return libvirt.open(connection_uri)
except libvirt.libvirtError as err:
raise LibvirtSessionError(
f'Failed to open connection to the hypervisor: {err}') from err
def close(self) -> None:
self.session.close()
def list_domains(self):
return self.session.listAllDomains()
def get_domain(self, name: str) -> libvirt.virDomain:
def _get_domain(self, name: str) -> libvirt.virDomain:
try:
return self.session.lookupByName(name)
return self.connection.lookupByName(name)
except libvirt.libvirtError as err:
if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
raise VMNotFound(name)
else:
raise LibvirtSessionError(err)
raise VMNotFound(name) from err
raise LibvirtSessionError(err) from err
def get_storage_pool(self, name: str) -> libvirt.virStoragePool:
def _list_all_domains(self) -> list[libvirt.virDomain]:
try:
return self.session.storagePoolLookupByName(name)
return self.connection.listAllDomains()
except libvirt.libvirtError as err:
raise LibvirtSessionError(err)
raise LibvirtSessionError(err) from err
def _get_storage_pool(self, name: str) -> libvirt.virStoragePool:
try:
return self.connection.storagePoolLookupByName(name)
except libvirt.libvirtError as err:
raise LibvirtSessionError(err) from err
def get_machine(self, name: str) -> VirtualMachine:
return VirtualMachine(self._get_domain(name))
def list_machines(self) -> list[VirtualMachine]:
return [VirtualMachine(dom) for dom in self._list_all_domains()]
def get_guest_agent(self, name: str, timeout: int | None = None,
flags: int | None = None) -> GuestAgent:
return GuestAgent(self._get_domain(name), timeout, flags)
def get_storage_pool(self, name: str) -> StoragePool:
return StoragePool(self._get_storage_pool(name))
def list_storage_pools(self):
return [StoragePool(p) for p in self.connection.listStoragePools()]
def close(self) -> None:
self.connection.close()

View File

@ -1 +1 @@
from .mac import *
from . import mac, xml

View File

@ -1,212 +0,0 @@
from pathlib import Path
from lxml.builder import E
from lxml.etree import Element, QName, SubElement, tostring, fromstring
XPATH_DOM_NAME = '/domain/name'
XPATH_DOM_TITLE = '/domain/title'
XPATH_DOM_DESCRIPTION = '/domain/description'
XPATH_DOM_METADATA = '/domain/metadata'
XPATH_DOM_MEMORY = '/domain/memory'
XPATH_DOM_CURRENT_MEMORY = '/domain/currentMemory'
XPATH_DOM_VCPU = '/domain/vcpu'
XPATH_DOM_OS = '/domian/os'
XPATH_DOM_CPU = '/domain/cpu'
class Reader:
def __init__(xml: str):
self.xml = xml
self.el = fromstring(self.xml)
def get_domcaps_machine(self):
return self.el.xpath('/domainCapabilities/machine')[0].text
def get_domcaps_cpus(self):
# mode can be: custom, host-model, host-passthrough
return self.el.xpath('/domainCapabilities/cpu/mode[@name="custom"]')[0]
class Constructor:
"""
The XML constructor. This class builds XML configs for libvirt.
Features:
- Generate basic virtual machine XML. See gen_domain_xml()
- Generate virtual disk XML. See gen_volume_xml()
- Add arbitrary metadata to XML from special structured dict
"""
def __init__(self, xml: str | None = None):
self.xml_string = xml
self.xml = None
@property
def domain_xml(self):
return self.xml
def gen_domain_xml(self,
name: str,
title: str,
vcpus: int,
vcpu_vendor: str,
vcpu_model: str,
mac_addr: str,
memory: int,
volume: Path,
vcpu_features: dict | None = None,
desc: str = "") -> None:
"""
Generate default domain XML configuration for virtual machines.
See https://lxml.de/tutorial.html#the-e-factory for details.
"""
domain_xml = E.domain(
E.name(name),
E.title(title),
E.description(desc),
E.metadata(),
E.memory(str(memory), unit='MB'),
E.currentMemory(str(memory), unit='MB'),
E.vcpu(str(vcpus), placement='static'),
E.os(
E.type('hvm', arch='x86_64'),
E.boot(dev='cdrom'),
E.boot(dev='hd'),
),
E.features(
E.acpi(),
E.apic(),
),
E.cpu(
E.vendor(vcpu_vendor),
E.model(vcpu_model, fallback='forbid'),
E.topology(sockets='1', dies='1', cores=str(vcpus),
threads='1'),
mode='custom',
match='exact',
check='partial',
),
E.on_poweroff('destroy'),
E.on_reboot('restart'),
E.on_crash('restart'),
E.pm(
E('suspend-to-mem', enabled='no'),
E('suspend-to-disk', enabled='no'),
),
E.devices(
E.emulator('/usr/bin/qemu-system-x86_64'),
E.disk(
E.driver(name='qemu', type='qcow2', cache='writethrough'),
E.source(file=volume),
E.target(dev='vda', bus='virtio'),
type='file',
device='disk',
),
E.interface(
E.source(network='default'),
E.mac(address=mac_addr),
type='network',
),
E.graphics(
E.listen(type='address'),
type='vnc', port='-1', autoport='yes'
),
E.video(
E.model(type='vga', vram='16384', heads='1', primary='yes'),
E.address(type='pci', domain='0x0000', bus='0x00',
slot='0x02', function='0x0'),
),
),
type='kvm',
)
return self.to_string(domain_xml)
def gen_volume_xml(self,
device_name: str,
file: Path,
bus: str = 'virtio',
cache: str = 'writethrough',
disktype: str = 'file'):
disk_xml = E.disk(E.driver(name='qemu', type='qcow2', cache=cache),
E.source(file=file),
E.target(dev=device_name, bus=bus),
type=disktype,
device='disk')
return self.to_string(disk_xml)
def add_volume(self):
raise NotImplementedError()
def add_meta(self, data: dict, namespace: str, nsprefix: str) -> None:
"""
Add metadata to domain. See:
https://libvirt.org/formatdomain.html#general-metadata
"""
metadata = metadata_old = self.xml.xpath('/domain/metadata')[0]
metadata.append(
self.construct_xml(
data,
namespace=namespace,
nsprefix=nsprefix,
))
self.xml.replace(metadata_old, metadata)
def remove_meta(self, namespace: str):
"""Remove metadata by namespace."""
raise NotImplementedError()
def construct_xml(self,
tag: dict,
namespace: str | None = None,
nsprefix: str | None = None,
root: Element = None) -> Element:
"""
Shortly this recursive function transforms dictonary to XML.
Return etree.Element built from dict with following structure::
{
'name': 'device', # tag name
'text': '', # optional key
'values': { # optional key, must be a dict of key-value pairs
'type': 'disk'
},
children: [] # optional key, must be a list of dicts
}
Child elements must have the same structure. Infinite `children` nesting
is allowed.
"""
use_ns = False
if isinstance(namespace, str) and isinstance(nsprefix, str):
use_ns = True
# Create element
if root is None:
if use_ns:
element = Element(QName(namespace, tag['name']),
nsmap={nsprefix: namespace})
else:
element = Element(tag['name'])
else:
if use_ns:
element = SubElement(root, QName(namespace, tag['name']))
else:
element = SubElement(root, tag['name'])
# Fill up element with content
if 'text' in tag.keys():
element.text = tag['text']
if 'values' in tag.keys():
for key in tag['values'].keys():
element.set(str(key), str(tag['values'][key]))
if 'children' in tag.keys():
for child in tag['children']:
element.append(
self.construct_xml(child,
namespace=namespace,
nsprefix=nsprefix,
root=element))
return element
def to_string(self):
return (tostring(self.xml, pretty_print=True,
encoding='utf-8').decode().strip())

View File

@ -8,9 +8,3 @@ def random_mac() -> str:
random.randint(0x00, 0xff),
random.randint(0x00, 0xff)]
return ':'.join(map(lambda x: "%02x" % x, mac))
def unique_mac() -> str:
"""Return non-conflicting MAC address."""
# todo: see virtinst.DeviceInterface.generate_mac
raise NotImplementedError()

View File

@ -1,7 +1,5 @@
from pathlib import Path
from lxml.builder import E
from lxml.etree import Element, QName, SubElement, tostring, fromstring
from lxml.etree import Element, QName, SubElement, fromstring, tostring
class Constructor:
@ -41,9 +39,9 @@ class Constructor:
devices = E.devices()
devices.append(E.emulator('/usr/bin/qemu-system-x86_64'))
devices.append(E.interface(
E.source(network='default'),
E.mac(address=mac),
type='network')
E.source(network='default'),
E.mac(address=mac),
type='network')
)
devices.append(E.graphics(type='vnc', port='-1', autoport='yes'))
devices.append(E.input(type='tablet', bus='usb'))
@ -63,15 +61,85 @@ class Constructor:
domain.append(devices)
return tostring(domain, encoding='unicode', pretty_print=True).strip()
def gen_volume_xml(self, dev: str, mode: str) -> str:
def gen_volume_xml(self, dev: str, mode: str, path: str) -> str:
"""
Todo: No hardcode
https://libvirt.org/formatdomain.html#hard-drives-floppy-disks-cdroms
"""
volume = E.disk(type='file', device='disk')
volume.append(E.driver(name='qemu', type='qcow2', cache='writethrough'))
volume.append(
E.driver(
name='qemu',
type='qcow2',
cache='writethrough'))
volume.append(E.source(file=path))
volume.append(E.target(dev=dev, bus='virtio'))
if mode.lower() == 'ro':
volume.append(E.readonly())
return tostring(volume, encoding='unicode', pretty_print=True).strip()
def construct_xml(self,
tag: dict,
namespace: str | None = None,
nsprefix: str | None = None,
root: Element = None) -> Element:
"""
Shortly this recursive function transforms dictonary to XML.
Return etree.Element built from dict with following structure::
{
'name': 'device', # tag name
'text': '', # optional key
'values': { # optional key, must be a dict of key-value pairs
'type': 'disk'
},
children: [] # optional key, must be a list of dicts
}
Child elements must have the same structure. Infinite `children` nesting
is allowed.
"""
use_ns = False
if isinstance(namespace, str) and isinstance(nsprefix, str):
use_ns = True
# Create element
if root is None:
if use_ns:
element = Element(QName(namespace, tag['name']),
nsmap={nsprefix: namespace})
else:
element = Element(tag['name'])
else:
if use_ns:
element = SubElement(root, QName(namespace, tag['name']))
else:
element = SubElement(root, tag['name'])
# Fill up element with content
if 'text' in tag.keys():
element.text = tag['text']
if 'values' in tag.keys():
for key in tag['values'].keys():
element.set(str(key), str(tag['values'][key]))
if 'children' in tag.keys():
for child in tag['children']:
element.append(
self.construct_xml(child,
namespace=namespace,
nsprefix=nsprefix,
root=element))
return element
def add_meta(self, xml: Element, data: dict,
namespace: str, nsprefix: str) -> None:
"""
Add metadata to domain. See:
https://libvirt.org/formatdomain.html#general-metadata
"""
metadata = metadata_old = xml.xpath('/domain/metadata')[0]
metadata.append(
self.construct_xml(
data,
namespace=namespace,
nsprefix=nsprefix,
))
xml.replace(metadata_old, metadata)

View File

@ -1,5 +1,4 @@
from .exceptions import *
from .guest_agent import QemuAgent
from .guest_agent import GuestAgent
from .installer import CPUMode, CPUTopology, VirtualMachineInstaller
from .virtual_machine import VirtualMachine
from .installer import VirtualMachineInstaller
from .hardware import vCPUMode, vCPUTopology

View File

@ -17,7 +17,6 @@ class VirtualMachineBase:
raise VMError(f'Cannot get domain name: {err}') from err
def _get_domain_info(self):
# https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainInfo
try:
info = self.domain.info()
return {

View File

@ -1,4 +1,4 @@
class QemuAgentError(Exception):
class GuestAgentError(Exception):
"""Mostly QEMU Guest Agent is not responding."""

View File

@ -7,17 +7,16 @@ import libvirt
import libvirt_qemu
from .base import VirtualMachineBase
from .exceptions import QemuAgentError
from .exceptions import GuestAgentError
logger = logging.getLogger(__name__)
# Note that if no QEMU_TIMEOUT libvirt cannot connect to agent
QEMU_TIMEOUT = 60 # in seconds
POLL_INTERVAL = 0.3 # also in seconds
class QemuAgent(VirtualMachineBase):
class GuestAgent(VirtualMachineBase):
"""
Interacting with QEMU guest agent. Methods:
@ -47,7 +46,7 @@ class QemuAgent(VirtualMachineBase):
Execute command on guest and return output if `capture_output` is True.
See https://wiki.qemu.org/Documentation/QMP for QEMU commands reference.
If `wait` is True poll guest command output with POLL_INTERVAL. Raise
QemuAgentError on `timeout` reached (in seconds).
GuestAgentError on `timeout` reached (in seconds).
Return values:
tuple(
exited: bool | None,
@ -121,9 +120,9 @@ class QemuAgent(VirtualMachineBase):
self.flags,
)
except libvirt.libvirtError as err:
raise QemuAgentError(
f'Cannot execute command on vm={self.domain_name}: {err}'
) from err
raise GuestAgentError(
f'Cannot execute command on vm={self.domain_name}: {err}'
) from err
def _get_cmd_result(
self, pid: int, decode_output: bool = False, wait: bool = True,
@ -145,7 +144,7 @@ class QemuAgent(VirtualMachineBase):
sleep(POLL_INTERVAL)
now = time()
if now - start_time > timeout:
raise QemuAgentError(
raise GuestAgentError(
f'Polling command pid={pid} on vm={self.domain_name} '
f'took longer than {timeout} seconds.'
)

View File

@ -1,81 +0,0 @@
import textwrap
from enum import Enum
from collections import UserDict
import libvirt
from lxml.etree import SubElement, fromstring, tostring
class Boot(Enum):
BIOS = 'bios'
UEFI = 'uefi'
class vCPUMode(Enum):
HOST_MODEL = 'host-model'
HOST_PASSTHROUGTH = 'host-passthrougth'
CUSTOM = 'custom'
MAXIMUM = 'maximum'
class DomainCapabilities:
def __init__(self, session: libvirt.virConnect):
self.session = session
self.domcaps = fromstring(
self.session.getDomainCapabilities())
@property
def arch(self):
return self.domcaps.xpath('/domainCapabilities/arch')[0].text
@property
def virttype(self):
return self.domcaps.xpath('/domainCapabilities/domain')[0].text
@property
def emulator(self):
return self.domcaps.xpath('/domainCapabilities/path')[0].text
@property
def machine(self):
return self.domcaps.xpath('/domainCapabilities/machine')[0].text
def best_cpu(self, mode: vCPUMode) -> str:
"""
See https://libvirt.org/html/libvirt-libvirt-host.html
#virConnectBaselineHypervisorCPU
"""
cpus = self.domcaps.xpath(
f'/domainCapabilities/cpu/mode[@name="{mode}"]')[0]
cpus.tag = 'cpu'
for attr in cpus.attrib.keys():
del cpus.attrib[attr]
arch = SubElement(cpus, 'arch')
arch.text = self.arch
xmlcpus = tostring(cpus, encoding='unicode', pretty_print=True)
xml = self.session.baselineHypervisorCPU(self.emulator,
self.arch, self.machine, self.virttype, [xmlcpus])
return textwrap.indent(xml, ' ' * 2)
class vCPUTopology(UserDict):
"""
CPU topology schema ``{'sockets': 1, 'cores': 4, 'threads': 1}``::
<topology sockets='1' dies='1' cores='4' threads='1'/>
"""
def __init__(self, topology: dict):
super().__init__(self._validate(topology))
def _validate(self, topology: dict):
if isinstance(topology, dict):
if ['sockets', 'cores', 'threads'] != list(topology.keys()):
raise ValueError("Topology must have 'sockets', 'cores' "
"and 'threads' keys.")
for key in topology.keys():
if not isinstance(topology[key], int):
raise TypeError(f"Key '{key}' must be 'int'")
return topology
raise TypeError("Topology must be a 'dict'")

View File

@ -1,77 +1,159 @@
import re
import textwrap
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from uuid import UUID
import libvirt
from lxml.etree import SubElement, fromstring, tostring
from ..utils.xml import Constructor
from ..utils.mac import random_mac
from .hardware import DomainCapabilities, vCPUMode, vCPUTopology, Boot
from ..utils import mac, xml
class vCPUInfo:
pass
class CPUMode(Enum):
HOST_MODEL = 'host-model'
HOST_PASSTHROUGH = 'host-passthrough'
CUSTOM = 'custom'
MAXIMUM = 'maximum'
class ImageVolume:
pass
@classmethod
def default(cls):
return cls.HOST_MODEL
@dataclass
class CPUTopology:
sockets: int
cores: int
threads: int
def validate(self, vcpus: int) -> None:
if self.sockets * self.cores * self.threads == vcpus:
return
raise ValueError("CPU topology must match the number of 'vcpus'")
@dataclass
class CPUInfo:
vendor: str
model: str
required_features: list[str]
disabled_features: list[str]
@dataclass
class VolumeInfo:
name: str
path: Path
capacity: int
@dataclass
class CloudInitConfig:
pass
user_data: str = ''
meta_data: str = ''
class Boot(Enum):
BIOS = 'bios'
UEFI = 'uefi'
@classmethod
def default(cls):
return cls.BIOS
@dataclass
class BootMenu:
enabled: bool = False
timeout: int = 3000
class BootOrder:
pass
class VirtualMachineInstaller:
def __init__(self, session: libvirt.virConnect):
self.session = session
self.info = {}
def __init__(self, session: 'LibvirtSession'):
self.connection = session.connection # libvirt.virConnect object
self.domcaps = fromstring(
self.connection.getDomainCapabilities())
self.arch = self.domcaps.xpath('/domainCapabilities/arch/text()')[0]
self.virttype = self.domcaps.xpath(
'/domainCapabilities/domain/text()')[0]
self.emulator = self.domcaps.xpath(
'/domainCapabilities/path/text()')[0]
self.machine = self.domcaps.xpath(
'/domainCapabilities/machine/text()')[0]
def install(
self,
name: str | None = None,
title: str | None = None,
description: str = '',
os: str | None = None,
image: ImageVolume | None = None,
volumes: list['VolumeInfo'] | None = None,
vcpus: int = 0,
vcpu_info: vCPUInfo | None = None,
vcpu_mode: vCPUMode | None = None,
vcpu_topology: vCPUTopology | None = None,
memory: int = 0,
boot: Boot = Boot.BIOS,
boot_menu: bool = False,
boot_order: BootOrder = ('cdrom', 'hd'),
cloud_init: CloudInitConfig | None = None):
self,
name: str | None = None,
title: str | None = None,
description: str = '',
os: str | None = None,
image: UUID | None = None,
volumes: list['VolumeInfo'] | None = None,
vcpus: int = 0,
vcpu_info: CPUInfo | None = None,
vcpu_mode: CPUMode = CPUMode.default(),
vcpu_topology: CPUTopology | None = None,
memory: int = 0,
boot: Boot = Boot.default(),
boot_menu: BootMenu = BootMenu(),
boot_order: tuple[str] = ('cdrom', 'hd'),
cloud_init: CloudInitConfig | None = None):
"""
Install virtual machine with passed parameters.
If no `vcpu_info` is None select best CPU wich can be provided by
hypervisor. Choosen CPU depends on `vcpu_mode`, default is 'custom'.
See CPUMode for more info. Default `vcpu_topology` is: 1 socket,
`vcpus` cores, 1 threads.
`memory` must be integer value in mebibytes e.g. 4094 MiB = 4 GiB.
Volumes must be passed as list of VolumeInfo objects. Minimum one
volume is required.
"""
domcaps = DomainCapabilities(self.session.session)
name = self._validate_name(name)
if vcpu_topology is None:
vcpu_topology = vCPUTopology(
{'sockets': 1, 'cores': vcpus, 'threads': 1})
self._validate_topology(vcpus, vcpu_topology)
vcpu_topology = CPUTopology(sockets=1, cores=vcpus, threads=1)
vcpu_topology.validate(vcpus)
if vcpu_info is None:
if not vcpu_mode:
vcpu_mode = vCPUMode.CUSTOM.value
xml_cpu = domcaps.best_cpu(vcpu_mode)
vcpu_mode = CPUMode.CUSTOM.value
xml_cpu = self._choose_best_cpu(vcpu_mode)
else:
raise NotImplementedError('Custom CPU not implemented')
xml_domain = Constructor().gen_domain_xml(
xml_domain = xml.Constructor().gen_domain_xml(
name=name,
title=title if title else name,
desc=description if description else '',
vcpus=vcpus,
# vcpu_topology=vcpu_topology,
# vcpu_info=vcpu_info,
memory=memory,
domain_type='hvm',
machine=domcaps.machine,
arch=domcaps.arch,
boot_order=('cdrom', 'hd'),
machine=self.machine,
arch=self.arch,
# boot_menu=boot_menu,
boot_order=boot_order,
cpu=xml_cpu,
mac=random_mac()
mac=mac.random_mac()
)
xml_volume = xml.Constructor().gen_volume_xml(
dev='vda', mode='rw', path='')
virconn = self.connection
virstor = virconn.storagePoolLookupByName('default')
# Мб использовать storageVolLookupByPath вместо поиска по имени
etalon_volume = virstor.storageVolLookupByName('debian_bookworm.qcow2')
return xml_domain
def _validate_name(self, name):
def _validate_name(self, name) -> str:
if name is None:
raise ValueError("'name' cannot be empty")
if isinstance(name, str):
@ -82,13 +164,27 @@ class VirtualMachineInstaller:
return name.lower()
raise TypeError(f"'name' must be 'str', not {type(name)}")
def _validate_topology(self, vcpus, topology):
sockets = topology['sockets']
cores = topology['cores']
threads = topology['threads']
if sockets * cores * threads == vcpus:
return
raise ValueError("CPU topology must match the number of 'vcpus'")
def _choose_best_cpu(self, mode: CPUMode) -> str:
if mode == 'host-passthrough':
xml = '<cpu mode="host-passthrough" migratable="on"/>'
elif mode == 'maximum':
xml = '<cpu mode="maximum" migratable="on"/>'
elif mode in ['host-model', 'custom']:
cpus = self.domcaps.xpath(
f'/domainCapabilities/cpu/mode[@name="{mode}"]')[0]
cpus.tag = 'cpu'
for attr in cpus.attrib.keys():
del cpus.attrib[attr]
arch = SubElement(cpus, 'arch')
arch.text = self.arch
xmlcpus = tostring(cpus, encoding='unicode', pretty_print=True)
xml = self.connection.baselineHypervisorCPU(
self.emulator, self.arch, self.machine, self.virttype, [xmlcpus])
else:
raise ValueError(
f'CPU mode must be in {[v.value for v in CPUMode]}, '
f"but passed '{mode}'")
return textwrap.indent(xml, ' ' * 2)
def _define(self, xml: str):
self.session.defineXML(xml)
def _define(self, xml: str) -> None:
self.connection.defineXML(xml)

View File

@ -2,6 +2,7 @@ import logging
import libvirt
from ..volume import VolumeInfo
from .base import VirtualMachineBase
from .exceptions import VMError
@ -65,7 +66,7 @@ class VirtualMachine(VirtualMachineBase):
logger.info('Starting VM: vm=%s', self.domain_name)
if self.is_running:
logger.warning('VM vm=%s is already started, nothing to do',
self.domain_name)
self.domain_name)
return
try:
self.domain.create()
@ -73,7 +74,7 @@ class VirtualMachine(VirtualMachineBase):
raise VMError(
f'Cannot start vm={self.domain_name}: {err}') from err
def shutdown(self, mode: str | None = None) -> None:
def shutdown(self, method: str | None = None) -> None:
"""
Send signal to guest OS to shutdown. Supports several modes:
* GUEST_AGENT - use guest agent
@ -82,26 +83,26 @@ class VirtualMachine(VirtualMachineBase):
* SIGKILL - send SIGKILL to QEMU process. May corrupt guest data!
If mode is not passed use 'NORMAL' mode.
"""
MODES = {
METHODS = {
'GUEST_AGENT': libvirt.VIR_DOMAIN_SHUTDOWN_GUEST_AGENT,
'NORMAL': libvirt.VIR_DOMAIN_SHUTDOWN_DEFAULT,
'SIGTERM': libvirt.VIR_DOMAIN_DESTROY_GRACEFUL,
'SIGKILL': libvirt.VIR_DOMAIN_DESTROY_DEFAULT
}
if mode is None:
mode = 'NORMAL'
if not isinstance(mode, str):
raise ValueError(f"Mode must be a 'str', not {type(mode)}")
if mode.upper() not in MODES:
raise ValueError(f"Unsupported mode: '{mode}'")
if method is None:
method = 'NORMAL'
if not isinstance(method, str):
raise ValueError(f"Mode must be a 'str', not {type(method)}")
if method.upper() not in METHODS:
raise ValueError(f"Unsupported mode: '{method}'")
try:
if mode in ['GUEST_AGENT', 'NORMAL']:
self.domain.shutdownFlags(flags=MODES.get(mode))
elif mode in ['SIGTERM', 'SIGKILL']:
self.domain.destroyFlags(flags=MODES.get(mode))
if method in ['GUEST_AGENT', 'NORMAL']:
self.domain.shutdownFlags(flags=METHODS.get(method))
elif method in ['SIGTERM', 'SIGKILL']:
self.domain.destroyFlags(flags=METHODS.get(method))
except libvirt.libvirtError as err:
raise VMError(f'Cannot shutdown vm={self.domain_name} with '
f'mode={mode}: {err}') from err
f'method={method}: {err}') from err
def reset(self) -> None:
"""
@ -186,10 +187,26 @@ class VirtualMachine(VirtualMachineBase):
raise VMError(
f'Cannot set memory for vm={self.domain_name}: {err}') from err
def attach_device(self, device: str):
pass
def attach_device(self, dev_xml: str, hotplug: bool = False):
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE +
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
self.domain.attachDeviceFlags(dev_xml, flags=flags)
def detach_device(self, device: str):
def detach_device(self, dev_xml: str, hotplug: bool = False):
if hotplug and self.domain_info['state'] == libvirt.VIR_DOMAIN_RUNNING:
flags = (libvirt.VIR_DOMAIN_AFFECT_LIVE +
libvirt.VIR_DOMAIN_AFFECT_CONFIG)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
self.domain.detachDeviceFlags(dev_xml, flags=flags)
def resize_volume(self, vol_info: VolumeInfo, online: bool = False):
# Этот метод должен принимать описание волюма и в зависимости от
# флага online вызывать virStorageVolResize или virDomainBlockResize
# https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainBlockResize
pass
def list_ssh_keys(self, user: str):
@ -201,8 +218,8 @@ class VirtualMachine(VirtualMachineBase):
def remove_ssh_keys(self, user: str):
pass
def set_user_password(self, user: str):
pass
def set_user_password(self, user: str, password: str):
self.domain.setUserPassword(user, password)
def dump_xml(self) -> str:
return self.domain.XMLDesc()

View File

@ -0,0 +1,2 @@
from .storage_pool import StoragePool
from .volume import Volume, VolumeInfo

View File

@ -1,9 +1,37 @@
import libvirt
from .volume import Volume, VolumeInfo
class StoragePool:
def __init__(self, pool: libvirt.virStoragePool):
self.pool = pool
def create_volume(self):
@property
def name(self) -> str:
return self.pool.name()
def dump_xml(self) -> str:
return self.pool.XMLDesc()
def create(self):
pass
def delete(self):
pass
def refresh(self) -> None:
self.pool.refresh()
def create_volume(self, vol_info: VolumeInfo) -> None:
# todo: return Volume object?
self.pool.createXML(
vol_info.to_xml(),
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA)
def get_volume(self, name: str) -> Volume:
vol = self.pool.storageVolLookupByName(name)
return Volume(self.pool, vol)
def list_volumes(self) -> list[Volume]:
return [Volume(self.pool, vol) for vol in self.pool.listAllVolumes()]

View File

@ -1,23 +1,64 @@
from dataclasses import dataclass
from time import time
import libvirt
from lxml.builder import E
from lxml.etree import tostring
@dataclass
class VolumeInfo:
"""
Volume info schema
{'type': 'local', 'system': True, 'size': 102400, 'mode': 'rw'}
"""
pass
name: str
path: str
capacity: int
def to_xml(self) -> str:
unixtime = str(int(time()))
xml = E.volume(type='file')
xml.append(E.name(self.name))
xml.append(E.key(self.path))
xml.append(E.source())
xml.append(E.capacity(str(self.capacity * 1024 * 1024), unit='bytes'))
xml.append(E.allocation('0'))
xml.append(E.target(
E.path(self.path),
E.format(type='qcow2'),
E.timestamps(
E.atime(unixtime),
E.mtime(unixtime),
E.ctime(unixtime)),
E.compat('1.1'),
E.features(E.lazy_refcounts())
))
return tostring(xml, encoding='unicode', pretty_print=True)
class Volume:
def __init__(self, pool: libvirt.virStorageVol):
def __init__(self, pool: libvirt.virStoragePool,
vol: libvirt.virStorageVol):
self.pool = pool
self.vol = vol
def lookup_by_path(self):
pass
@property
def name(self) -> str:
return self.vol.name()
def generate_xml(self):
pass
@property
def path(self) -> str:
return self.vol.path()
def create(self):
pass
def dump_xml(self) -> str:
return self.vol.XMLDesc()
def clone(self, vol_info: VolumeInfo) -> None:
self.pool.createXMLFrom(
vol_info.to_xml(),
self.vol,
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA)
def resize(self, capacity: int):
"""Resize volume to `capacity`. Unit is mebibyte."""
self.vol.resize(capacity * 1024 * 1024)
def delete(self) -> None:
self.vol.delete()