various improvements

This commit is contained in:
ge
2023-11-23 02:34:02 +03:00
parent b9d089dd78
commit 05f90b14f2
200 changed files with 15968 additions and 84 deletions

View File

@ -0,0 +1,81 @@
Metadata-Version: 2.1
Name: compute
Version: 0.1.0.dev1
Summary: Compute instances management library and tools
Author: ge
Author-email: ge@nixhacks.net
Requires-Python: >=3.11,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Dist: libvirt-python (==9.0.0)
Requires-Dist: lxml (>=4.9.2,<5.0.0)
Requires-Dist: pydantic (==1.10.4)
Requires-Dist: pyyaml (>=6.0.1,<7.0.0)
Description-Content-Type: text/markdown
# Compute
Compute instances management library and tools.
## Docs
Run `make serve-docs`. See [Development](#development) below.
## Roadmap
- [x] Create instances
- [ ] CDROM
- [ ] cloud-init for provisioning instances
- [x] Instance power management
- [x] Instance pause and resume
- [x] vCPU hotplug
- [x] Memory hotplug
- [x] Hot disk resize [not tested]
- [ ] CPU topology customization
- [x] CPU customization (emulation mode, model, vendor, features)
- [ ] BIOS/UEFI settings
- [x] Device attaching
- [x] Device detaching
- [ ] GPU passthrough
- [ ] CPU guarantied resource percent support
- [x] QEMU Guest Agent management
- [ ] Instance resources usage stats
- [ ] SSH-keys management
- [x] Setting user passwords in guest
- [x] QCOW2 disks support
- [ ] ZVOL support
- [ ] Network disks support
- [ ] Images service integration (Images service is not implemented yet)
- [ ] Manage storage pools
- [ ] Idempotency
- [ ] CLI [in progress]
- [ ] HTTP API
- [ ] Instance migrations
- [ ] Instance snapshots
- [ ] Instance backups
- [ ] LXC
## Development
Python 3.11+ is required.
Install [poetry](https://python-poetry.org/), clone this repository and run:
```
poetry install --with dev --with docs
```
# Build Debian package
Install Docker first, then run:
```
make build-deb
```
`compute` and `compute-doc` packages will built. See packaging/build directory. Packages can be installed via `dpkg` or `apt-get`:
```
apt-get install ./compute*.deb
```

View File

@ -0,0 +1,23 @@
../scripts/compute,sha256=b-Gj6H6ssfbGalpouUMSX5pmsjqDnN9xMdTwnU-UfZY,216
compute/__init__.py,sha256=x4zp_CoVPKgDT6AqhometspAyinGxJUXO48duJ5aHUM,873
compute/__main__.py,sha256=zJyKJul6pCbguFPtVLZBoAuZl9RXibn4CCMn46jIgUQ,745
compute/cli/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
compute/cli/control.py,sha256=83wnR21pHOPyyk1i1n_YBIDz6dCFB6hmuIFguIk68rs,14634
compute/common.py,sha256=G1qwC1EybG5LEJtyoux9ymiqB2ZOsgKXlCpbuhHv55Y,948
compute/exceptions.py,sha256=Ga59L55qSAPeyDfjANPuMh4yVSRWHDYi9xqq5o4_7-0,2452
compute/instance/__init__.py,sha256=kHN8jVamyrBZYZgi62tPtJ7rS73gUPhfswLalmPA5Zs,772
compute/instance/guest_agent.py,sha256=fq89kQbcV5X5eFCsMmujRuwTOSghWO4ZhAjvxyUu84M,7018
compute/instance/instance.py,sha256=WP6oTJfdAf6QlefwVLqdC8J6XoKHum6nZhwwHOEtjNk,23297
compute/instance/schemas.py,sha256=B51ytPlxhnx0MrkR2WYhd49RaRT7Is7NsIM9OrMUpvI,4288
compute/session.py,sha256=znYOIzoiCbSG62k-ViaXti_lOnw88wD8Syp3nCXAJ28,10050
compute/storage/__init__.py,sha256=zNaVjZ2925DxrVUFWwVRsGU6bSYbF46sb4L6NsaiKbw,736
compute/storage/pool.py,sha256=9z99bBDbb4ATGpfMkEWpxAO4fEQHNVOxxf0iUln9cN0,4197
compute/storage/volume.py,sha256=_TbK9Y4d3NAeknPUiuhldAT3ZaN1sZgjy4QzC-Sw4Io,4110
compute/utils/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
compute/utils/config_loader.py,sha256=ul1J3sZg0D9R0HbOz5Pg9JmL4nFaMahAzQEdGaWFABU,1989
compute/utils/ids.py,sha256=fg6Xsg4OMM-BIaU3DPu0L91ICwx-L3qNoELEwQZz2s0,1007
compute/utils/units.py,sha256=UkwD0zQ-rlpSpkbfezCcvJx4D8iZlI9M-oXXvdVEvy0,1549
compute-0.1.0.dev1.dist-info/METADATA,sha256=tbX8xp92Jwqf44sOwPB-HqKHLezab5dU9DrQDYFitDQ,1944
compute-0.1.0.dev1.dist-info/WHEEL,sha256=vVCvjcmxuUltf8cYhJ0sJMRDLr1XsPuxEId8YDzbyCY,88
compute-0.1.0.dev1.dist-info/entry_points.txt,sha256=xHhg-Fo9Z5gJnIahbG8pVIGNDqlH5Eordn8hnXUwscw,51
compute-0.1.0.dev1.dist-info/RECORD,,

View File

@ -0,0 +1,4 @@
Wheel-Version: 1.0
Generator: poetry-core 1.4.0
Root-Is-Purelib: true
Tag: py3-none-any

View File

@ -0,0 +1,3 @@
[console_scripts]
compute=compute.cli.control:cli

View File

@ -0,0 +1,22 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Compute instances management library."""
__version__ = '0.1.0-dev1'
from .instance import Instance, InstanceConfig, InstanceSchema
from .session import Session
from .storage import StoragePool, Volume, VolumeConfig

View File

@ -0,0 +1,21 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Command line interface for compute module."""
from compute.cli import main
main.cli()

View File

@ -0,0 +1,501 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Command line interface."""
import argparse
import io
import logging
import os
import shlex
import sys
from collections import UserDict
from typing import Any
from uuid import uuid4
import libvirt
import yaml
from pydantic import ValidationError
from compute import __version__
from compute.exceptions import ComputeError, GuestAgentTimeoutExceededError
from compute.instance import GuestAgent
from compute.session import Session
from compute.utils import ids
log = logging.getLogger(__name__)
log_levels = [lv.lower() for lv in logging.getLevelNamesMapping()]
libvirt.registerErrorHandler(
lambda userdata, err: None, # noqa: ARG005
ctx=None,
)
class Table:
"""Minimalistic text table constructor."""
def __init__(self, whitespace: str | None = None):
"""Initialise Table."""
self.whitespace = whitespace or '\t'
self.header = []
self.rows = []
self.table = ''
def add_row(self, row: list) -> None:
"""Add table row."""
self.rows.append([str(col) for col in row])
def add_rows(self, rows: list[list]) -> None:
"""Add multiple rows."""
for row in rows:
self.add_row(row)
def __str__(self) -> str:
"""Build table and return."""
widths = [max(map(len, col)) for col in zip(*self.rows, strict=True)]
self.rows.insert(0, [str(h).upper() for h in self.header])
for row in self.rows:
self.table += self.whitespace.join(
(
val.ljust(width)
for val, width in zip(row, widths, strict=True)
)
)
self.table += '\n'
return self.table.strip()
def _list_instances(session: Session) -> None:
table = Table()
table.header = ['NAME', 'STATE']
for instance in session.list_instances():
table.add_row(
[
instance.name,
instance.get_status(),
]
)
print(table)
sys.exit()
def _exec_guest_agent_command(
session: Session, args: argparse.Namespace
) -> None:
instance = session.get_instance(args.instance)
ga = GuestAgent(instance.domain, timeout=args.timeout)
arguments = args.arguments.copy()
if len(arguments) > 1 and not args.no_join_args:
arguments = [shlex.join(arguments)]
if not args.no_join_args:
arguments.insert(0, '-c')
stdin = None
if not sys.stdin.isatty():
stdin = sys.stdin.read()
try:
output = ga.guest_exec(
path=args.executable,
args=arguments,
env=args.env,
stdin=stdin,
capture_output=True,
decode_output=True,
poll=True,
)
except GuestAgentTimeoutExceededError as e:
sys.exit(
f'{e}. NOTE: command may still running in guest, '
f'PID={ga.last_pid}'
)
if output.stderr:
print(output.stderr.strip(), file=sys.stderr)
if output.stdout:
print(output.stdout.strip(), file=sys.stdout)
sys.exit(output.exitcode)
class _NotPresent:
"""
Type for representing non-existent dictionary keys.
See :class:`_FillableDict`.
"""
class _FillableDict(UserDict):
"""Use :method:`fill` to add key if not present."""
def __init__(self, data: dict):
self.data = data
def fill(self, key: str, value: Any) -> None: # noqa: ANN401
if self.data.get(key, _NotPresent) is _NotPresent:
self.data[key] = value
def _merge_dicts(a: dict, b: dict, path: list[str] | None = None) -> dict:
"""Merge `b` into `a`. Return modified `a`."""
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
_merge_dicts(a[key], b[key], [path + str(key)])
elif a[key] == b[key]:
pass # same leaf value
else:
a[key] = b[key] # replace existing key's values
else:
a[key] = b[key]
return a
def _create_instance(session: Session, file: io.TextIOWrapper) -> None:
try:
data = _FillableDict(yaml.load(file.read(), Loader=yaml.SafeLoader))
log.debug('Read from file: %s', data)
except yaml.YAMLError as e:
sys.exit(f'error: cannot parse YAML: {e}')
capabilities = session.get_capabilities()
node_info = session.get_node_info()
data.fill('name', uuid4().hex)
data.fill('title', None)
data.fill('description', None)
data.fill('arch', capabilities.arch)
data.fill('machine', capabilities.machine)
data.fill('emulator', capabilities.emulator)
data.fill('max_vcpus', node_info.cpus)
data.fill('max_memory', node_info.memory)
data.fill('cpu', {})
cpu = {
'emulation_mode': 'host-passthrough',
'model': None,
'vendor': None,
'topology': None,
'features': None,
}
data['cpu'] = _merge_dicts(data['cpu'], cpu)
data.fill(
'network_interfaces',
[{'source': 'default', 'mac': ids.random_mac()}],
)
data.fill('boot', {'order': ['cdrom', 'hd']})
try:
log.debug('Input data: %s', data)
session.create_instance(**data)
except ValidationError as e:
for error in e.errors():
fields = '.'.join([str(lc) for lc in error['loc']])
print(
f"validation error: {fields}: {error['msg']}",
file=sys.stderr,
)
sys.exit()
def _shutdown_instance(session: Session, args: argparse.Namespace) -> None:
instance = session.get_instance(args.instance)
if args.soft:
method = 'SOFT'
elif args.hard:
method = 'HARD'
elif args.unsafe:
method = 'UNSAFE'
else:
method = 'NORMAL'
instance.shutdown(method)
def main(session: Session, args: argparse.Namespace) -> None:
"""Perform actions."""
match args.command:
case 'init':
_create_instance(session, args.file)
case 'exec':
_exec_guest_agent_command(session, args)
case 'ls':
_list_instances(session)
case 'start':
instance = session.get_instance(args.instance)
instance.start()
case 'shutdown':
_shutdown_instance(session, args)
case 'reboot':
instance = session.get_instance(args.instance)
instance.reboot()
case 'reset':
instance = session.get_instance(args.instance)
instance.reset()
case 'powrst':
instance = session.get_instance(args.instance)
instance.power_reset()
case 'pause':
instance = session.get_instance(args.instance)
instance.pause()
case 'resume':
instance = session.get_instance(args.instance)
instance.resume()
case 'status':
instance = session.get_instance(args.instance)
print(instance.status)
case 'setvcpus':
instance = session.get_instance(args.instance)
instance.set_vcpus(args.nvcpus, live=True)
case 'setmem':
instance = session.get_instance(args.instance)
instance.set_memory(args.memory, live=True)
case 'setpass':
instance = session.get_instance(args.instance)
instance.set_user_password(
args.username,
args.password,
encrypted=args.encrypted,
)
def cli() -> None: # noqa: PLR0915
"""Return command line arguments parser."""
root = argparse.ArgumentParser(
prog='compute',
description='manage compute instances',
formatter_class=argparse.RawTextHelpFormatter,
)
root.add_argument(
'-v',
'--verbose',
action='store_true',
default=False,
help='enable verbose mode',
)
root.add_argument(
'-c',
'--connect',
metavar='URI',
help='libvirt connection URI',
)
root.add_argument(
'-l',
'--log-level',
type=str.lower,
metavar='LEVEL',
choices=log_levels,
help='log level',
)
root.add_argument(
'-V',
'--version',
action='version',
version=__version__,
)
subparsers = root.add_subparsers(dest='command', metavar='COMMAND')
# init command
init = subparsers.add_parser(
'init', help='initialise instance using YAML config file'
)
init.add_argument(
'file',
type=argparse.FileType('r', encoding='UTF-8'),
nargs='?',
default='instance.yaml',
help='instance config [default: instance.yaml]',
)
# exec subcommand
execute = subparsers.add_parser(
'exec',
help='execute command in guest via guest agent',
description=(
'NOTE: any argument after instance name will be passed into '
'guest as shell command.'
),
)
execute.add_argument('instance')
execute.add_argument('arguments', nargs=argparse.REMAINDER)
execute.add_argument(
'-t',
'--timeout',
type=int,
default=60,
help=(
'waiting time in seconds for a command to be executed '
'in guest [default: 60]'
),
)
execute.add_argument(
'-x',
'--executable',
default='/bin/sh',
help='path to executable in guest [default: /bin/sh]',
)
execute.add_argument(
'-e',
'--env',
type=str,
nargs='?',
action='append',
help='environment variables to pass to executable in guest',
)
execute.add_argument(
'-n',
'--no-join-args',
action='store_true',
default=False,
help=(
"do not join arguments list and add '-c' option, suitable "
'for non-shell executables and other specific cases.'
),
)
# ls subcommand
listall = subparsers.add_parser('ls', help='list instances')
listall.add_argument(
'-a',
'--all',
action='store_true',
default=False,
help='list all instances including inactive',
)
# start subcommand
start = subparsers.add_parser('start', help='start instance')
start.add_argument('instance')
# shutdown subcommand
shutdown = subparsers.add_parser('shutdown', help='shutdown instance')
shutdown.add_argument('instance')
shutdown_opts = shutdown.add_mutually_exclusive_group()
shutdown_opts.add_argument(
'-s',
'--soft',
action='store_true',
help='normal guest OS shutdown, guest agent is used',
)
shutdown_opts.add_argument(
'-n',
'--normal',
action='store_true',
help='shutdown with hypervisor selected method [default]',
)
shutdown_opts.add_argument(
'-H',
'--hard',
action='store_true',
help=(
"gracefully destroy instance, it's like long "
'pressing the power button'
),
)
shutdown_opts.add_argument(
'-u',
'--unsafe',
action='store_true',
help=(
'destroy instance, this is similar to a power outage '
'and may result in data loss or corruption'
),
)
# reboot subcommand
reboot = subparsers.add_parser('reboot', help='reboot instance')
reboot.add_argument('instance')
# reset subcommand
reset = subparsers.add_parser('reset', help='reset instance')
reset.add_argument('instance')
# powrst subcommand
powrst = subparsers.add_parser('powrst', help='power reset instance')
powrst.add_argument('instance')
# pause subcommand
pause = subparsers.add_parser('pause', help='pause instance')
pause.add_argument('instance')
# resume subcommand
resume = subparsers.add_parser('resume', help='resume paused instance')
resume.add_argument('instance')
# status subcommand
status = subparsers.add_parser('status', help='display instance status')
status.add_argument('instance')
# setvcpus subcommand
setvcpus = subparsers.add_parser('setvcpus', help='set vCPU number')
setvcpus.add_argument('instance')
setvcpus.add_argument('nvcpus', type=int)
# setmem subcommand
setmem = subparsers.add_parser('setmem', help='set memory size')
setmem.add_argument('instance')
setmem.add_argument('memory', type=int, help='memory in MiB')
# setpass subcommand
setpass = subparsers.add_parser(
'setpass',
help='set user password in guest',
)
setpass.add_argument('instance')
setpass.add_argument('username')
setpass.add_argument('password')
setpass.add_argument(
'-e',
'--encrypted',
action='store_true',
default=False,
help='set it if password is already encrypted',
)
args = root.parse_args()
if args.command is None:
root.print_help()
sys.exit()
log_level = args.log_level or os.getenv('CMP_LOG')
if isinstance(log_level, str) and log_level.lower() in log_levels:
logging.basicConfig(
level=logging.getLevelNamesMapping()[log_level.upper()]
)
log.debug('CLI started with args: %s', args)
connect_uri = (
args.connect
or os.getenv('CMP_LIBVIRT_URI')
or os.getenv('LIBVIRT_DEFAULT_URI')
or 'qemu:///system'
)
try:
with Session(connect_uri) as session:
main(session, args)
except ComputeError as e:
sys.exit(f'error: {e}')
except KeyboardInterrupt:
sys.exit()
except SystemExit as e:
sys.exit(e)
except Exception as e: # noqa: BLE001
sys.exit(f'unexpected error {type(e)}: {e}')
if __name__ == '__main__':
cli()

View File

@ -0,0 +1,30 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Common symbols."""
from abc import ABC, abstractmethod
class EntityConfig(ABC):
"""An abstract entity XML config builder class."""
@abstractmethod
def to_xml(self) -> str:
"""Return device XML config."""
raise NotImplementedError
DeviceConfig = EntityConfig

View File

@ -0,0 +1,80 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Exceptions."""
class ComputeError(Exception):
"""Basic exception class."""
class ConfigLoaderError(ComputeError):
"""Something went wrong when loading configuration."""
class SessionError(ComputeError):
"""Something went wrong while connecting to libvirtd."""
class GuestAgentError(ComputeError):
"""Something went wring when QEMU Guest Agent call."""
class GuestAgentUnavailableError(GuestAgentError):
"""Guest agent is not connected or is unavailable."""
class GuestAgentTimeoutExceededError(GuestAgentError):
"""QEMU timeout exceeded."""
def __init__(self, msg: int):
"""Initialise GuestAgentTimeoutExceededError."""
super().__init__(f'QEMU timeout ({msg} sec) exceeded')
class GuestAgentCommandNotSupportedError(GuestAgentError):
"""Guest agent command is not supported or blacklisted on guest."""
class StoragePoolError(ComputeError):
"""Something went wrong when operating with storage pool."""
class StoragePoolNotFoundError(StoragePoolError):
"""Storage pool not found."""
def __init__(self, msg: str):
"""Initialise StoragePoolNotFoundError."""
super().__init__(f"storage pool named '{msg}' not found")
class VolumeNotFoundError(StoragePoolError):
"""Storage volume not found."""
def __init__(self, msg: str):
"""Initialise VolumeNotFoundError."""
super().__init__(f"storage volume '{msg}' not found")
class InstanceError(ComputeError):
"""Something went wrong while interacting with the domain."""
class InstanceNotFoundError(InstanceError):
"""Virtual machine or container not found on compute node."""
def __init__(self, msg: str):
"""Initialise InstanceNotFoundError."""
super().__init__(f"compute instance '{msg}' not found")

View File

@ -0,0 +1,18 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from .guest_agent import GuestAgent
from .instance import Instance, InstanceConfig
from .schemas import InstanceSchema

View File

@ -0,0 +1,208 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Interacting with the QEMU Guest Agent."""
import json
import logging
from base64 import b64decode, standard_b64encode
from time import sleep, time
from typing import NamedTuple
import libvirt
import libvirt_qemu
from compute.exceptions import (
GuestAgentCommandNotSupportedError,
GuestAgentError,
GuestAgentTimeoutExceededError,
GuestAgentUnavailableError,
)
log = logging.getLogger(__name__)
class GuestExecOutput(NamedTuple):
"""QEMU guest-exec command output."""
exited: bool | None = None
exitcode: int | None = None
stdout: str | None = None
stderr: str | None = None
class GuestAgent:
"""Class for interacting with QEMU guest agent."""
def __init__(self, domain: libvirt.virDomain, timeout: int = 60):
"""
Initialise GuestAgent.
:param domain: Libvirt domain object
:param timeout: QEMU timeout
"""
self.domain = domain
self.timeout = timeout
self.flags = libvirt_qemu.VIR_DOMAIN_QEMU_MONITOR_COMMAND_DEFAULT
self.last_pid = None
def execute(self, command: dict) -> dict:
"""
Execute QEMU guest agent command.
See: https://qemu-project.gitlab.io/qemu/interop/qemu-ga-ref.html
:param command: QEMU guest agent command as dict
:return: Command output
:rtype: dict
"""
log.debug(command)
try:
output = libvirt_qemu.qemuAgentCommand(
self.domain, json.dumps(command), self.timeout, self.flags
)
return json.loads(output)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_AGENT_UNRESPONSIVE:
raise GuestAgentUnavailableError(e) from e
raise GuestAgentError(e) from e
def is_available(self) -> bool:
"""
Execute guest-ping.
:return: True or False if guest agent is unreachable.
:rtype: bool
"""
try:
if self.execute({'execute': 'guest-ping', 'arguments': {}}):
return True
except GuestAgentError:
return False
def get_supported_commands(self) -> set[str]:
"""Return set of supported guest agent commands."""
output = self.execute({'execute': 'guest-info', 'arguments': {}})
return {
cmd['name']
for cmd in output['return']['supported_commands']
if cmd['enabled'] is True
}
def raise_for_commands(self, commands: list[str]) -> None:
"""
Raise exception if QEMU GA command is not available.
:param commands: List of required commands
:raise: GuestAgentCommandNotSupportedError
"""
supported = self.get_supported_commands()
for command in commands:
if command not in supported:
raise GuestAgentCommandNotSupportedError(command)
def guest_exec( # noqa: PLR0913
self,
path: str,
args: list[str] | None = None,
env: list[str] | None = None,
stdin: str | None = None,
*,
capture_output: bool = False,
decode_output: bool = False,
poll: bool = False,
) -> GuestExecOutput:
"""
Execute qemu-exec command and return output.
:param path: Path ot executable on guest.
:param arg: List of arguments to pass to executable.
:param env: List of environment variables to pass to executable.
For example: ``['LANG=C', 'TERM=xterm']``
:param stdin: Data to pass to executable STDIN.
:param capture_output: Capture command output.
:param decode_output: Use base64_decode() to decode command output.
Affects only if `capture_output` is True.
:param poll: Poll command output. Uses `self.timeout` and
POLL_INTERVAL constant.
:return: Command output
:rtype: GuestExecOutput
"""
self.raise_for_commands(['guest-exec', 'guest-exec-status'])
command = {
'execute': 'guest-exec',
'arguments': {
'path': path,
**({'arg': args} if args else {}),
**({'env': env} if env else {}),
**(
{
'input-data': standard_b64encode(
stdin.encode('utf-8')
).decode('utf-8')
}
if stdin
else {}
),
'capture-output': capture_output,
},
}
output = self.execute(command)
self.last_pid = pid = output['return']['pid']
command_status = self.guest_exec_status(pid, poll=poll)['return']
exited = command_status['exited']
exitcode = command_status['exitcode']
stdout = command_status.get('out-data', None)
stderr = command_status.get('err-data', None)
if decode_output:
stdout = b64decode(stdout or '').decode('utf-8')
stderr = b64decode(stderr or '').decode('utf-8')
return GuestExecOutput(exited, exitcode, stdout, stderr)
def guest_exec_status(
self, pid: int, *, poll: bool = False, poll_interval: float = 0.3
) -> dict:
"""
Execute guest-exec-status and return output.
:param pid: PID in guest.
:param poll: If True poll command status.
:param poll_interval: Time between attempts to obtain command status.
:return: Command output
:rtype: dict
"""
self.raise_for_commands(['guest-exec-status'])
command = {
'execute': 'guest-exec-status',
'arguments': {'pid': pid},
}
if not poll:
return self.execute(command)
start_time = time()
while True:
command_status = self.execute(command)
if command_status['return']['exited']:
break
sleep(poll_interval)
now = time()
if now - start_time > self.timeout:
raise GuestAgentTimeoutExceededError(self.timeout)
log.debug(
'Polling command pid=%s finished, time taken: %s seconds',
pid,
int(time() - start_time),
)
return command_status

View File

@ -0,0 +1,675 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Manage compute instances."""
__all__ = ['Instance', 'InstanceConfig', 'InstanceInfo']
import logging
from typing import NamedTuple
import libvirt
from lxml import etree
from lxml.builder import E
from compute.common import DeviceConfig, EntityConfig
from compute.exceptions import (
GuestAgentCommandNotSupportedError,
InstanceError,
)
from compute.storage import DiskConfig
from compute.utils import units
from .guest_agent import GuestAgent
from .schemas import (
CPUEmulationMode,
CPUSchema,
InstanceSchema,
NetworkInterfaceSchema,
)
log = logging.getLogger(__name__)
class InstanceConfig(EntityConfig):
"""Compute instance XML config builder."""
def __init__(self, schema: InstanceSchema):
"""
Initialise InstanceConfig.
:param schema: InstanceSchema object
"""
self.name = schema.name
self.title = schema.title
self.description = schema.description
self.memory = schema.memory
self.max_memory = schema.max_memory
self.vcpus = schema.vcpus
self.max_vcpus = schema.max_vcpus
self.cpu = schema.cpu
self.machine = schema.machine
self.emulator = schema.emulator
self.arch = schema.arch
self.boot = schema.boot
self.network_interfaces = schema.network_interfaces
def _gen_cpu_xml(self, cpu: CPUSchema) -> etree.Element:
options = {
'mode': cpu.emulation_mode,
'match': 'exact',
'check': 'partial',
}
if cpu.emulation_mode == CPUEmulationMode.HOST_PASSTHROUGH:
options['check'] = 'none'
options['migratable'] = 'on'
xml = E.cpu(**options)
if cpu.model:
xml.append(E.model(cpu.model, fallback='forbid'))
if cpu.vendor:
xml.append(E.vendor(cpu.vendor))
if cpu.topology:
xml.append(
E.topology(
sockets=str(cpu.topology.sockets),
dies=str(cpu.topology.dies),
cores=str(cpu.topology.cores),
threads=str(cpu.topology.threads),
)
)
if cpu.features:
for feature in cpu.features.require:
xml.append(E.feature(policy='require', name=feature))
for feature in cpu.features.disable:
xml.append(E.feature(policy='disable', name=feature))
return xml
def _gen_vcpus_xml(self, vcpus: int, max_vcpus: int) -> etree.Element:
xml = E.vcpus()
xml.append(E.vcpu(id='0', enabled='yes', hotpluggable='no', order='1'))
for i in range(max_vcpus - 1):
enabled = 'yes' if (i + 2) <= vcpus else 'no'
xml.append(
E.vcpu(
id=str(i + 1),
enabled=enabled,
hotpluggable='yes',
order=str(i + 2),
)
)
return xml
def _gen_network_interface_xml(
self, interface: NetworkInterfaceSchema
) -> etree.Element:
return E.interface(
E.source(network=interface.source),
E.mac(address=interface.mac),
type='network',
)
def to_xml(self) -> str:
"""Return XML config for libvirt."""
xml = E.domain(type='kvm')
xml.append(E.name(self.name))
if self.title:
xml.append(E.title(self.title))
if self.description:
xml.append(E.description(self.description))
xml.append(E.metadata())
xml.append(E.memory(str(self.max_memory * 1024), unit='KiB'))
xml.append(E.currentMemory(str(self.memory * 1024), unit='KiB'))
xml.append(
E.vcpu(
str(self.max_vcpus),
placement='static',
current=str(self.vcpus),
)
)
xml.append(self._gen_cpu_xml(self.cpu))
os = E.os(E.type('hvm', machine=self.machine, arch=self.arch))
for dev in self.boot.order:
os.append(E.boot(dev=dev))
xml.append(os)
xml.append(E.features(E.acpi(), E.apic()))
xml.append(E.on_poweroff('destroy'))
xml.append(E.on_reboot('restart'))
xml.append(E.on_crash('restart'))
xml.append(
E.pm(
E('suspend-to-mem', enabled='no'),
E('suspend-to-disk', enabled='no'),
)
)
devices = E.devices()
devices.append(E.emulator(str(self.emulator)))
for interface in self.network_interfaces:
devices.append(self._gen_network_interface_xml(interface))
devices.append(E.graphics(type='vnc', port='-1', autoport='yes'))
devices.append(E.input(type='tablet', bus='usb'))
devices.append(
E.channel(
E.source(mode='bind'),
E.target(type='virtio', name='org.qemu.guest_agent.0'),
E.address(
type='virtio-serial', controller='0', bus='0', port='1'
),
type='unix',
)
)
devices.append(
E.console(E.target(type='serial', port='0'), type='pty')
)
devices.append(
E.video(
E.model(type='vga', vram='16384', heads='1', primary='yes')
)
)
xml.append(devices)
return etree.tostring(xml, encoding='unicode', pretty_print=True)
class InstanceInfo(NamedTuple):
"""
Store compute instance info.
Reference:
https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainInfo
"""
state: str
max_memory: int
memory: int
nproc: int
cputime: int
class Instance:
"""Manage compute instances."""
def __init__(self, domain: libvirt.virDomain):
"""
Initialise Instance.
:ivar libvirt.virDomain domain: domain object
:ivar libvirt.virConnect connection: connection object
:ivar str name: domain name
:ivar GuestAgent guest_agent: :class:`GuestAgent` object
:param domain: libvirt domain object
"""
self.domain = domain
self.connection = domain.connect()
self.name = domain.name()
self.guest_agent = GuestAgent(domain)
def _expand_instance_state(self, state: int) -> str:
states = {
libvirt.VIR_DOMAIN_NOSTATE: 'nostate',
libvirt.VIR_DOMAIN_RUNNING: 'running',
libvirt.VIR_DOMAIN_BLOCKED: 'blocked',
libvirt.VIR_DOMAIN_PAUSED: 'paused',
libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown',
libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff',
libvirt.VIR_DOMAIN_CRASHED: 'crashed',
libvirt.VIR_DOMAIN_PMSUSPENDED: 'pmsuspended',
}
return states[state]
def get_info(self) -> InstanceInfo:
"""Return instance info."""
info = self.domain.info()
return InstanceInfo(
state=self._expand_instance_state(info[0]),
max_memory=info[1],
memory=info[2],
nproc=info[3],
cputime=info[4],
)
def get_status(self) -> str:
"""
Return instance state: 'running', 'shutoff', etc.
Reference:
https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainState
"""
try:
state, _ = self.domain.state()
except libvirt.libvirtError as e:
raise InstanceError(
'Cannot fetch status of ' f'instance={self.name}: {e}'
) from e
return self._expand_instance_state(state)
def is_running(self) -> bool:
"""Return True if instance is running, else return False."""
if self.domain.isActive() != 1:
# 0 - is inactive, -1 - is error
return False
return True
def is_autostart(self) -> bool:
"""Return True if instance autostart is enabled, else return False."""
try:
return bool(self.domain.autostart())
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot get autostart status for '
f'instance={self.name}: {e}'
) from e
def get_max_memory(self) -> int:
"""Maximum memory value for domain in KiB."""
return self.domain.maxMemory()
def get_max_vcpus(self) -> int:
"""Maximum vCPUs number for domain."""
return self.domain.maxVcpus()
def start(self) -> None:
"""Start defined instance."""
log.info('Starting instnce=%s', self.name)
if self.is_running():
log.warning(
'Already started, nothing to do instance=%s', self.name
)
return
try:
self.domain.create()
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot start instance={self.name}: {e}'
) from e
def shutdown(self, method: str | None = None) -> None:
"""
Shutdown instance.
Shutdown methods:
SOFT
Use guest agent to shutdown. If guest agent is unavailable
NORMAL method will be used.
NORMAL
Use method choosen by hypervisor to shutdown. Usually send ACPI
signal to guest OS. OS may ignore ACPI e.g. if guest is hanged.
HARD
Shutdown instance without any guest OS shutdown. This is simular
to unplugging machine from power. Internally send SIGTERM to
instance process and destroy it gracefully.
UNSAFE
Force shutdown. Internally send SIGKILL to instance process.
There is high data corruption risk!
If method is None NORMAL method will used.
:param method: Method used to shutdown instance
"""
methods = {
'SOFT': libvirt.VIR_DOMAIN_SHUTDOWN_GUEST_AGENT,
'NORMAL': libvirt.VIR_DOMAIN_SHUTDOWN_DEFAULT,
'HARD': libvirt.VIR_DOMAIN_DESTROY_GRACEFUL,
'UNSAFE': libvirt.VIR_DOMAIN_DESTROY_DEFAULT,
}
if method is None:
method = 'NORMAL'
if not isinstance(method, str):
raise TypeError(
f"Shutdown method must be a 'str', not {type(method)}"
)
method = method.upper()
if method not in methods:
raise ValueError(f"Unsupported shutdown method: '{method}'")
try:
if method in ['SOFT', 'NORMAL']:
self.domain.shutdownFlags(flags=methods[method])
elif method in ['HARD', 'UNSAFE']:
self.domain.destroyFlags(flags=methods[method])
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot shutdown instance={self.name} ' f'{method=}: {e}'
) from e
def reboot(self) -> None:
"""Send ACPI signal to guest OS to reboot. OS may ignore this."""
try:
self.domain.reboot()
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot reboot instance={self.name}: {e}'
) from e
def reset(self) -> None:
"""
Reset instance.
Copypaste from libvirt doc:
Reset a domain immediately without any guest OS shutdown.
Reset emulates the power reset button on a machine, where all
hardware sees the RST line set and reinitializes internal state.
Note that there is a risk of data loss caused by reset without any
guest OS shutdown.
"""
try:
self.domain.reset()
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot reset instance={self.name}: {e}'
) from e
def power_reset(self) -> None:
"""
Shutdown instance and start.
By analogy with real hardware, this is a normal server shutdown,
and then turning off from the power supply and turning it on again.
This method is applicable in cases where there has been a
configuration change in libvirt and you need to restart the
instance to apply the new configuration.
"""
self.shutdown(method='NORMAL')
self.start()
def set_autostart(self, *, enabled: bool) -> None:
"""
Set autostart flag for instance.
:param enabled: Bool argument to set or unset autostart flag.
"""
autostart = 1 if enabled else 0
try:
self.domain.setAutostart(autostart)
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot set autostart flag for instance={self.name} '
f'{autostart=}: {e}'
) from e
def set_vcpus(self, nvcpus: int, *, live: bool = False) -> None:
"""
Set vCPU number.
If `live` is True and instance is not currently running vCPUs
will set in config and will applied when instance boot.
NB: Note that if this call is executed before the guest has
finished booting, the guest may fail to process the change.
:param nvcpus: Number of vCPUs
:param live: Affect a running instance
"""
if nvcpus <= 0:
raise InstanceError('Cannot set zero vCPUs')
if nvcpus > self.get_max_vcpus():
raise InstanceError('vCPUs count is greather than max_vcpus')
if nvcpus == self.get_info().nproc:
log.warning(
'Instance instance=%s already have %s vCPUs, nothing to do',
self.name,
nvcpus,
)
return
try:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
self.domain.setVcpusFlags(nvcpus, flags=flags)
if live is True:
if not self.is_running():
log.warning(
'Instance is not running, changes applied in '
'instance config.'
)
return
flags = libvirt.VIR_DOMAIN_AFFECT_LIVE
self.domain.setVcpusFlags(nvcpus, flags=flags)
if self.guest_agent.is_available():
try:
self.guest_agent.raise_for_commands(
['guest-set-vcpus']
)
flags = libvirt.VIR_DOMAIN_VCPU_GUEST
self.domain.setVcpusFlags(nvcpus, flags=flags)
except GuestAgentCommandNotSupportedError:
log.warning(
'Cannot set vCPUs in guest via agent, you may '
'need to apply changes in guest manually.'
)
else:
log.warning(
'Cannot set vCPUs in guest OS on instance=%s. '
'You may need to apply CPUs in guest manually.',
self.name,
)
except libvirt.libvirtError as e:
raise InstanceError(
f'Cannot set vCPUs for instance={self.name}: {e}'
) from e
def set_memory(self, memory: int, *, live: bool = False) -> None:
"""
Set memory.
If `live` is True and instance is not currently running set memory
in config and will applied when instance boot.
:param memory: Memory value in mebibytes
:param live: Affect a running instance
"""
if memory <= 0:
raise InstanceError('Cannot set zero memory')
if (memory * 1024) > self.get_max_memory():
raise InstanceError('Memory is greather than max_memory')
if (memory * 1024) == self.get_info().memory:
log.warning(
"Instance '%s' already have %s memory, nothing to do",
self.name,
memory,
)
return
if live and self.is_running():
flags = (
libvirt.VIR_DOMAIN_AFFECT_LIVE
| libvirt.VIR_DOMAIN_AFFECT_CONFIG
)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
try:
self.domain.setMemoryFlags(memory * 1024, flags=flags)
except libvirt.libvirtError as e:
msg = f'Cannot set memory for instance={self.name} {memory=}: {e}'
raise InstanceError(msg) from e
def _get_disk_by_target(self, target: str) -> etree.Element:
xml = etree.fromstring(self.dump_xml()) # noqa: S320
child = xml.xpath(f'/domain/devices/disk/target[@dev="{target}"]')
return child[0].getparent() if child else None
def attach_device(
self, device: DeviceConfig, *, live: bool = False
) -> None:
"""
Attach device to compute instance.
:param device: Object with device description e.g. DiskConfig
:param live: Affect a running instance
"""
if live and self.is_running():
flags = (
libvirt.VIR_DOMAIN_AFFECT_LIVE
| libvirt.VIR_DOMAIN_AFFECT_CONFIG
)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
if isinstance(device, DiskConfig): # noqa: SIM102
if self._get_disk_by_target(device.target):
log.warning(
"Volume with target '%s' is already attached",
device.target,
)
return
self.domain.attachDeviceFlags(device.to_xml(), flags=flags)
def detach_device(
self, device: DeviceConfig, *, live: bool = False
) -> None:
"""
Dettach device from compute instance.
:param device: Object with device description e.g. DiskConfig
:param live: Affect a running instance
"""
if live and self.is_running():
flags = (
libvirt.VIR_DOMAIN_AFFECT_LIVE
| libvirt.VIR_DOMAIN_AFFECT_CONFIG
)
else:
flags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
if isinstance(device, DiskConfig): # noqa: SIM102
if self._get_disk_by_target(device.target) is None:
log.warning(
"Volume with target '%s' is already detached",
device.target,
)
return
self.domain.detachDeviceFlags(device.to_xml(), flags=flags)
def detach_disk(self, name: str) -> None:
"""
Detach disk device by target name.
There is no ``attach_disk()`` method. Use :func:`attach_device`
with :class:`DiskConfig` as argument.
:param name: Disk name e.g. 'vda', 'sda', etc. This name may
not match the name of the disk inside the guest OS.
"""
xml = self._get_disk_by_target(name)
if xml is None:
log.warning(
"Volume with target '%s' is already detached",
name,
)
return
disk_params = {
'disk_type': xml.get('type'),
'source': xml.find('source').get('file'),
'target': xml.find('target').get('dev'),
'readonly': False if xml.find('readonly') is None else True, # noqa: SIM211
}
for param in disk_params:
if disk_params[param] is None:
msg = (
f"Cannot detach volume with target '{name}': "
f"parameter '{param}' is not defined in libvirt XML "
'config on host.'
)
raise InstanceError(msg)
self.detach_device(DiskConfig(**disk_params), live=True)
def resize_disk(
self, name: str, capacity: int, unit: units.DataUnit
) -> None:
"""
Resize attached block device.
:param name: Disk device name e.g. `vda`, `sda`, etc.
:param capacity: New capacity.
:param unit: Capacity unit.
"""
self.domain.blockResize(
name,
units.to_bytes(capacity, unit=unit),
flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES,
)
def get_disks(self) -> list[DiskConfig]:
"""Return list of attached disks."""
raise NotImplementedError
def pause(self) -> None:
"""Pause instance."""
if not self.is_running():
raise InstanceError('Cannot pause inactive instance')
self.domain.suspend()
def resume(self) -> None:
"""Resume paused instance."""
self.domain.resume()
def get_ssh_keys(self, user: str) -> list[str]:
"""
Return list of SSH keys on guest for specific user.
:param user: Username.
"""
raise NotImplementedError
def set_ssh_keys(self, user: str, ssh_keys: list[str]) -> None:
"""
Add SSH keys to guest for specific user.
:param user: Username.
:param ssh_keys: List of public SSH keys.
"""
raise NotImplementedError
def delete_ssh_keys(self, user: str, ssh_keys: list[str]) -> None:
"""
Remove SSH keys from guest for specific user.
:param user: Username.
:param ssh_keys: List of public SSH keys.
"""
raise NotImplementedError
def set_user_password(
self, user: str, password: str, *, encrypted: bool = False
) -> None:
"""
Set new user password in guest OS.
This action performs by guest agent inside the guest.
:param user: Username.
:param password: Password.
:param encrypted: Set it to True if password is already encrypted.
Right encryption method depends on guest OS.
"""
if not self.guest_agent.is_available():
raise InstanceError(
'Cannot change password: guest agent is unavailable'
)
self.guest_agent.raise_for_commands(['guest-set-user-password'])
flags = libvirt.VIR_DOMAIN_PASSWORD_ENCRYPTED if encrypted else 0
self.domain.setUserPassword(user, password, flags=flags)
def dump_xml(self, *, inactive: bool = False) -> str:
"""Return instance XML description."""
flags = libvirt.VIR_DOMAIN_XML_INACTIVE if inactive else 0
return self.domain.XMLDesc(flags)
def delete(self) -> None:
"""Undefine instance."""
# TODO @ge: delete local disks
self.shutdown(method='HARD')
self.domain.undefine()

View File

@ -0,0 +1,165 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Compute instance related objects schemas."""
import re
from enum import StrEnum
from pathlib import Path
from pydantic import BaseModel, Extra, validator
from compute.utils.units import DataUnit
class EntityModel(BaseModel):
"""Basic entity model."""
class Config:
"""Do not allow extra fields."""
extra = Extra.forbid
class CPUEmulationMode(StrEnum):
"""CPU emulation mode enumerated."""
HOST_PASSTHROUGH = 'host-passthrough'
HOST_MODEL = 'host-model'
CUSTOM = 'custom'
MAXIMUM = 'maximum'
class CPUTopologySchema(EntityModel):
"""CPU topology model."""
sockets: int
cores: int
threads: int
dies: int = 1
class CPUFeaturesSchema(EntityModel):
"""CPU features model."""
require: list[str]
disable: list[str]
class CPUSchema(EntityModel):
"""CPU model."""
emulation_mode: CPUEmulationMode
model: str | None
vendor: str | None
topology: CPUTopologySchema | None
features: CPUFeaturesSchema | None
class VolumeType(StrEnum):
"""Storage volume types enumeration."""
FILE = 'file'
class VolumeCapacitySchema(EntityModel):
"""Storage volume capacity field model."""
value: int
unit: DataUnit
class VolumeSchema(EntityModel):
"""Storage volume model."""
type: VolumeType # noqa: A003
target: str
capacity: VolumeCapacitySchema
source: str | None = None
is_readonly: bool = False
is_system: bool = False
class NetworkInterfaceSchema(EntityModel):
"""Network inerface model."""
source: str
mac: str
class BootOptionsSchema(EntityModel):
"""Instance boot settings."""
order: tuple
class InstanceSchema(EntityModel):
"""Compute instance model."""
name: str
title: str | None
description: str | None
memory: int
max_memory: int
vcpus: int
max_vcpus: int
cpu: CPUSchema
machine: str
emulator: Path
arch: str
boot: BootOptionsSchema
volumes: list[VolumeSchema]
network_interfaces: list[NetworkInterfaceSchema]
image: str | None = None
@validator('name')
def _check_name(cls, value: str) -> str: # noqa: N805
if not re.match(r'^[a-z0-9_]+$', value):
msg = (
'Name can contain only lowercase letters, numbers '
'and underscore.'
)
raise ValueError(msg)
return value
@validator('cpu')
def _check_topology(cls, cpu: int, values: dict) -> CPUSchema: # noqa: N805
topo = cpu.topology
max_vcpus = values['max_vcpus']
if topo and topo.sockets * topo.cores * topo.threads != max_vcpus:
msg = f'CPU topology does not match with {max_vcpus=}'
raise ValueError(msg)
return cpu
@validator('volumes')
def _check_volumes(cls, volumes: list) -> list: # noqa: N805
if len([v for v in volumes if v.is_system is True]) != 1:
msg = 'volumes list must contain one system volume'
raise ValueError(msg)
vol_with_source = 0
for vol in volumes:
if vol.is_system is True and vol.is_readonly is True:
msg = 'volume marked as system cannot be readonly'
raise ValueError(msg)
if vol.source is not None:
vol_with_source += 1
return volumes
@validator('network_interfaces')
def _check_network_interfaces(cls, value: list) -> list: # noqa: N805
if not value:
msg = 'Network interfaces list must contain at least one element'
raise ValueError(msg)
return value

View File

@ -0,0 +1,286 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Hypervisor session manager."""
import logging
import os
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Any, NamedTuple
from uuid import uuid4
import libvirt
from lxml import etree
from .exceptions import (
InstanceNotFoundError,
SessionError,
StoragePoolNotFoundError,
)
from .instance import Instance, InstanceConfig, InstanceSchema
from .storage import DiskConfig, StoragePool, VolumeConfig
from .utils import units
log = logging.getLogger(__name__)
class Capabilities(NamedTuple):
"""Store domain capabilities info."""
arch: str
virt_type: str
emulator: str
machine: str
max_vcpus: int
cpu_vendor: str
cpu_model: str
cpu_features: dict
usable_cpus: list[dict]
class NodeInfo(NamedTuple):
"""
Store compute node info.
See https://libvirt.org/html/libvirt-libvirt-host.html#virNodeInfo
NOTE: memory unit in libvirt docs is wrong! Actual unit is MiB.
"""
arch: str
memory: int
cpus: int
mhz: int
nodes: int
sockets: int
cores: int
threads: int
class Session(AbstractContextManager):
"""
Hypervisor session context manager.
:cvar IMAGES_POOL: images storage pool name taken from env
:cvar VOLUMES_POOL: volumes storage pool name taken from env
"""
IMAGES_POOL = os.getenv('CMP_IMAGES_POOL')
VOLUMES_POOL = os.getenv('CMP_VOLUMES_POOL')
def __init__(self, uri: str | None = None):
"""
Initialise session with hypervisor.
:ivar str uri: libvirt connection URI.
:ivar libvirt.virConnect connection: libvirt connection object.
:param uri: libvirt connection URI.
"""
self.uri = uri or 'qemu:///system'
self.connection = libvirt.open(self.uri)
def __enter__(self):
"""Return Session object."""
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
):
"""Close the connection when leaving the context."""
self.close()
def close(self) -> None:
"""Close connection to libvirt daemon."""
self.connection.close()
def get_node_info(self) -> NodeInfo:
"""Return information about compute node."""
info = self.connection.getInfo()
return NodeInfo(
arch=info[0],
memory=info[1],
cpus=info[2],
mhz=info[3],
nodes=info[4],
sockets=info[5],
cores=info[6],
threads=info[7],
)
def _cap_get_usable_cpus(self, xml: etree.Element) -> list[dict]:
x = xml.xpath('/domainCapabilities/cpu/mode[@name="custom"]')[0]
cpus = []
for cpu in x.findall('model'):
if cpu.get('usable') == 'yes':
cpus.append( # noqa: PERF401
{
'vendor': cpu.get('vendor'),
'model': cpu.text,
}
)
return cpus
def _cap_get_cpu_features(self, xml: etree.Element) -> dict:
x = xml.xpath('/domainCapabilities/cpu/mode[@name="host-model"]')[0]
require = []
disable = []
for feature in x.findall('feature'):
policy = feature.get('policy')
name = feature.get('name')
if policy == 'require':
require.append(name)
if policy == 'disable':
disable.append(name)
return {'require': require, 'disable': disable}
def get_capabilities(self) -> Capabilities:
"""Return capabilities e.g. arch, virt, emulator, etc."""
prefix = '/domainCapabilities'
hprefix = f'{prefix}/cpu/mode[@name="host-model"]'
caps = etree.fromstring(self.connection.getDomainCapabilities()) # noqa: S320
return Capabilities(
arch=caps.xpath(f'{prefix}/arch/text()')[0],
virt_type=caps.xpath(f'{prefix}/domain/text()')[0],
emulator=caps.xpath(f'{prefix}/path/text()')[0],
machine=caps.xpath(f'{prefix}/machine/text()')[0],
max_vcpus=int(caps.xpath(f'{prefix}/vcpu/@max')[0]),
cpu_vendor=caps.xpath(f'{hprefix}/vendor/text()')[0],
cpu_model=caps.xpath(f'{hprefix}/model/text()')[0],
cpu_features=self._cap_get_cpu_features(caps),
usable_cpus=self._cap_get_cpus(caps),
)
def create_instance(self, **kwargs: Any) -> Instance:
"""
Create and return new compute instance.
:param name: Instance name.
:type name: str
:param title: Instance title for humans.
:type title: str
:param description: Some information about instance.
:type description: str
:param memory: Memory in MiB.
:type memory: int
:param max_memory: Maximum memory in MiB.
:type max_memory: int
:param vcpus: Number of vCPUs.
:type vcpus: int
:param max_vcpus: Maximum vCPUs.
:type max_vcpus: int
:param cpu: CPU configuration. See :class:`CPUSchema` for info.
:type cpu: dict
:param machine: QEMU emulated machine.
:type machine: str
:param emulator: Path to emulator.
:type emulator: str
:param arch: CPU architecture to virtualization.
:type arch: str
:param boot: Boot settings. See :class:`BootOptionsSchema`.
:type boot: dict
:param image: Source disk image name for system disk.
:type image: str
:param volumes: List of storage volume configs. For more info
see :class:`VolumeSchema`.
:type volumes: list[dict]
:param network_interfaces: List of virtual network interfaces
configs. See :class:`NetworkInterfaceSchema` for more info.
:type network_interfaces: list[dict]
"""
data = InstanceSchema(**kwargs)
config = InstanceConfig(data)
log.info('Define XML...')
log.info(config.to_xml())
self.connection.defineXML(config.to_xml())
log.info('Getting instance...')
instance = self.get_instance(config.name)
log.info('Creating volumes...')
for volume in data.volumes:
log.info('Creating volume=%s', volume)
capacity = units.to_bytes(
volume.capacity.value, volume.capacity.unit
)
log.info('Connecting to images pool...')
images_pool = self.get_storage_pool(self.IMAGES_POOL)
log.info('Connecting to volumes pool...')
volumes_pool = self.get_storage_pool(self.VOLUMES_POOL)
log.info('Building volume configuration...')
if not volume.source:
vol_name = f'{uuid4()}.qcow2'
else:
vol_name = volume.source
vol_conf = VolumeConfig(
name=vol_name,
path=str(volumes_pool.path.joinpath(vol_name)),
capacity=capacity,
)
log.info('Volume configuration is:\n %s', vol_conf.to_xml())
if volume.is_system is True and data.image:
log.info(
"Volume is marked as 'system', start cloning image..."
)
log.info('Get image %s', data.image)
image = images_pool.get_volume(data.image)
log.info('Cloning image into volumes pool...')
vol = volumes_pool.clone_volume(image, vol_conf)
log.info(
'Resize cloned volume to specified size: %s',
capacity,
)
vol.resize(capacity, unit=units.DataUnit.BYTES)
else:
log.info('Create volume...')
volumes_pool.create_volume(vol_conf)
log.info('Attaching volume to instance...')
instance.attach_device(
DiskConfig(
disk_type=volume.type,
source=vol_conf.path,
target=volume.target,
readonly=volume.is_readonly,
)
)
return instance
def get_instance(self, name: str) -> Instance:
"""Get compute instance by name."""
try:
return Instance(self.connection.lookupByName(name))
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
raise InstanceNotFoundError(name) from e
raise SessionError(e) from e
def list_instances(self) -> list[Instance]:
"""List all instances."""
return [Instance(dom) for dom in self.connection.listAllDomains()]
def get_storage_pool(self, name: str) -> StoragePool:
"""Get storage pool by name."""
try:
return StoragePool(self.connection.storagePoolLookupByName(name))
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_STORAGE_POOL:
raise StoragePoolNotFoundError(name) from e
raise SessionError(e) from e
def list_storage_pools(self) -> list[StoragePool]:
"""List all strage pools."""
return [StoragePool(p) for p in self.connection.listStoragePools()]

View File

@ -0,0 +1,17 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from .pool import StoragePool
from .volume import DiskConfig, Volume, VolumeConfig

View File

@ -0,0 +1,124 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Manage storage pools."""
import logging
from pathlib import Path
from typing import NamedTuple
import libvirt
from lxml import etree
from compute.exceptions import StoragePoolError, VolumeNotFoundError
from .volume import Volume, VolumeConfig
log = logging.getLogger(__name__)
class StoragePoolUsageInfo(NamedTuple):
"""Storage pool usage info."""
capacity: int
allocation: int
available: int
class StoragePool:
"""Storage pool manipulating class."""
def __init__(self, pool: libvirt.virStoragePool):
"""Initislise StoragePool."""
self.pool = pool
self.name = pool.name()
self.path = self._get_path()
def _get_path(self) -> Path:
"""Return storage pool path."""
xml = etree.fromstring(self.pool.XMLDesc()) # noqa: S320
return Path(xml.xpath('/pool/target/path/text()')[0])
def get_usage_info(self) -> StoragePoolUsageInfo:
"""Return info about storage pool usage."""
xml = etree.fromstring(self.pool.XMLDesc()) # noqa: S320
return StoragePoolUsageInfo(
capacity=int(xml.xpath('/pool/capacity/text()')[0]),
allocation=int(xml.xpath('/pool/allocation/text()')[0]),
available=int(xml.xpath('/pool/available/text()')[0]),
)
def dump_xml(self) -> str:
"""Return storage pool XML description as string."""
return self.pool.XMLDesc()
def refresh(self) -> None:
"""Refresh storage pool."""
# TODO @ge: handle libvirt asynchronous job related exceptions
self.pool.refresh()
def create_volume(self, vol_conf: VolumeConfig) -> Volume:
"""Create storage volume and return Volume instance."""
log.info(
'Create storage volume vol=%s in pool=%s', vol_conf.name, self.name
)
vol = self.pool.createXML(
vol_conf.to_xml(),
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
)
return Volume(self.pool, vol)
def clone_volume(self, src: Volume, dst: VolumeConfig) -> Volume:
"""
Make storage volume copy.
:param src: Input volume
:param dst: Output volume config
"""
log.info(
'Start volume cloning '
'src_pool=%s src_vol=%s dst_pool=%s dst_vol=%s',
src.pool_name,
src.name,
self.pool.name,
dst.name,
)
vol = self.pool.createXMLFrom(
dst.to_xml(), # new volume XML description
src.vol, # source volume virStorageVol object
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
)
if vol is None:
raise StoragePoolError
return Volume(self.pool, vol)
def get_volume(self, name: str) -> Volume | None:
"""Lookup and return Volume instance or None."""
log.info(
'Lookup for storage volume vol=%s in pool=%s', name, self.pool.name
)
try:
vol = self.pool.storageVolLookupByName(name)
return Volume(self.pool, vol)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_STORAGE_VOL:
raise VolumeNotFoundError(name) from e
log.exception('unexpected error from libvirt')
raise StoragePoolError(e) from e
def list_volumes(self) -> list[Volume]:
"""Return list of volumes in storage pool."""
return [Volume(self.pool, vol) for vol in self.pool.listAllVolumes()]

View File

@ -0,0 +1,138 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Manage storage volumes."""
from dataclasses import dataclass
from pathlib import Path
from time import time
import libvirt
from lxml import etree
from lxml.builder import E
from compute.common import DeviceConfig, EntityConfig
from compute.utils import units
@dataclass
class VolumeConfig(EntityConfig):
"""
Storage volume XML config builder.
Generate XML config for creating a volume in a libvirt
storage pool.
"""
name: str
path: str
capacity: int
def to_xml(self) -> str:
"""Return XML config for libvirt."""
unixtime = str(int(time()))
xml = E.volume(type='file')
xml.append(E.name(self.name))
xml.append(E.key(self.path))
xml.append(E.source())
xml.append(E.capacity(str(self.capacity), unit='bytes'))
xml.append(E.allocation('0'))
xml.append(
E.target(
E.path(self.path),
E.format(type='qcow2'),
E.timestamps(
E.atime(unixtime), E.mtime(unixtime), E.ctime(unixtime)
),
E.compat('1.1'),
E.features(E.lazy_refcounts()),
)
)
return etree.tostring(xml, encoding='unicode', pretty_print=True)
@dataclass
class DiskConfig(DeviceConfig):
"""
Disk XML config builder.
Generate XML config for attaching or detaching storage volumes
to compute instances.
"""
disk_type: str
source: str | Path
target: str
readonly: bool = False
def to_xml(self) -> str:
"""Return XML config for libvirt."""
xml = E.disk(type=self.disk_type, device='disk')
xml.append(E.driver(name='qemu', type='qcow2', cache='writethrough'))
if self.disk_type == 'file':
xml.append(E.source(file=str(self.source)))
xml.append(E.target(dev=self.target, bus='virtio'))
if self.readonly:
xml.append(E.readonly())
return etree.tostring(xml, encoding='unicode', pretty_print=True)
class Volume:
"""Storage volume manipulating class."""
def __init__(
self, pool: libvirt.virStoragePool, vol: libvirt.virStorageVol
):
"""
Initialise Volume.
:param pool: libvirt virStoragePool object
:param vol: libvirt virStorageVol object
"""
self.pool = pool
self.pool_name = pool.name()
self.vol = vol
self.name = vol.name()
self.path = Path(vol.path())
def dump_xml(self) -> str:
"""Return volume XML description as string."""
return self.vol.XMLDesc()
def clone(self, vol_conf: VolumeConfig) -> None:
"""
Make a copy of volume to the same storage pool.
:param vol_info VolumeInfo: New storage volume dataclass object
"""
self.pool.createXMLFrom(
vol_conf.to_xml(),
self.vol,
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
)
def resize(self, capacity: int, unit: units.DataUnit) -> None:
"""
Resize volume.
:param capacity int: Volume new capacity.
:param unit DataUnit: Data unit. Internally converts into bytes.
"""
# TODO @ge: Check actual volume size before resize
self.vol.resize(units.to_bytes(capacity, unit=unit))
def delete(self) -> None:
"""Delete volume from storage pool."""
self.vol.delete()

View File

@ -0,0 +1,56 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Configuration loader."""
import tomllib
from collections import UserDict
from pathlib import Path
from compute.exceptions import ConfigLoaderError
DEFAULT_CONFIGURATION = {}
DEFAULT_CONFIG_FILE = '/etc/computed/computed.toml'
class ConfigLoader(UserDict):
"""UserDict for storing configuration."""
def __init__(self, file: Path | None = None):
"""
Initialise ConfigLoader.
:param file: Path to configuration file. If `file` is None
use default path from DEFAULT_CONFIG_FILE constant.
"""
# TODO @ge: load deafult configuration
self.file = Path(file) if file else Path(DEFAULT_CONFIG_FILE)
super().__init__(self.load())
def load(self) -> dict:
"""Load confguration object from TOML file."""
try:
with Path(self.file).open('rb') as configfile:
return tomllib.load(configfile)
# TODO @ge: add config schema validation
except tomllib.TOMLDecodeError as tomlerr:
raise ConfigLoaderError(
f'Bad TOML syntax in config file: {self.file}: {tomlerr}'
) from tomlerr
except (OSError, ValueError) as readerr:
raise ConfigLoaderError(
f'Cannot read config file: {self.file}: {readerr}'
) from readerr

View File

@ -0,0 +1,33 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Random identificators."""
# ruff: noqa: S311, C417
import random
def random_mac() -> str:
"""Retrun random MAC address."""
mac = [
0x00,
0x16,
0x3E,
random.randint(0x00, 0x7F),
random.randint(0x00, 0xFF),
random.randint(0x00, 0xFF),
]
return ':'.join(map(lambda x: '%02x' % x, mac))

View File

@ -0,0 +1,54 @@
# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
"""Tools for data units convertion."""
from enum import StrEnum
class DataUnit(StrEnum):
"""Data units enumerated."""
BYTES = 'bytes'
KIB = 'KiB'
MIB = 'MiB'
GIB = 'GiB'
TIB = 'TiB'
class InvalidDataUnitError(ValueError):
"""Data unit is not valid."""
def __init__(self, msg: str):
"""Initialise InvalidDataUnitError."""
super().__init__(
f'{msg}, valid units are: {", ".join(list(DataUnit))}'
)
def to_bytes(value: int, unit: DataUnit = DataUnit.BYTES) -> int:
"""Convert value to bytes. See :class:`DataUnit`."""
try:
_ = DataUnit(unit)
except ValueError as e:
raise InvalidDataUnitError(e) from e
powers = {
DataUnit.BYTES: 0,
DataUnit.KIB: 1,
DataUnit.MIB: 2,
DataUnit.GIB: 3,
DataUnit.TIB: 4,
}
return value * pow(1024, powers[unit])