cool updates

This commit is contained in:
ge
2023-07-22 23:59:49 +03:00
parent b608d88265
commit 754c608826
22 changed files with 926 additions and 213 deletions

View File

@ -0,0 +1,2 @@
from .main import VirtualMachine
from .ga import QemuAgent

22
node_agent/vm/base.py Normal file
View File

@ -0,0 +1,22 @@
import libvirt
from ..main import LibvirtSession
from ..exceptions import VMNotFound
class VMBase:
def __init__(self, session: LibvirtSession, name: str):
self.domname = name
self.session = session.session # virConnect object
self.config = session.config # ConfigLoader object
self.domain = self._get_domain(name)
def _get_domain(self, name: str) -> libvirt.virDomain:
"""Get virDomain object by name to manipulate with domain."""
try:
domain = self.session.lookupByName(name)
if domain is not None:
return domain
raise VMNotFound(name)
except libvirt.libvirtError as err:
raise VMNotFound(err) from err

192
node_agent/vm/ga.py Normal file
View File

@ -0,0 +1,192 @@
import json
import logging
from time import time, sleep
from base64 import standard_b64encode, b64decode, b64encode
import libvirt
import libvirt_qemu
from ..main import LibvirtSession
from ..exceptions import QemuAgentError
from .base import VMBase
logger = logging.getLogger(__name__)
DEFAULT_WAIT_TIMEOUT = 60 # seconds
POLL_INTERVAL = 0.3
class QemuAgent(VMBase):
"""
Interacting with QEMU guest agent. Methods:
execute()
Low-level method for executing QEMU command as dict. Command dict
internally converts to JSON. See method docstring for more info.
shellexec()
High-level method for executing shell commands on guest. Command
must be passed as string. Wraps execute() method.
_execute()
Just executes QEMU command. Wraps libvirt_qemu.qemuAgentCommand()
_get_cmd_result()
Intended for long-time commands. This function loops and every
POLL_INTERVAL calls 'guest-exec-status' for specified guest PID.
Polling ends on command exited or on timeout.
_return_tuple()
This method transforms JSON command output to tuple and decode
base64 encoded strings optionally.
"""
def __init__(self,
session: LibvirtSession,
name: str,
timeout: int | None = None,
flags: int | None = None
):
super().__init__(session, name)
self.timeout = timeout or DEFAULT_WAIT_TIMEOUT # timeout for guest agent
self.flags = flags or libvirt_qemu.VIR_DOMAIN_QEMU_MONITOR_COMMAND_DEFAULT
def execute(
self,
command: dict,
stdin: str | None = None,
capture_output: bool = False,
decode_output: bool = False,
wait: bool = True,
timeout: int = DEFAULT_WAIT_TIMEOUT,
):
"""
Execute command on guest and return output if capture_output is True.
See https://wiki.qemu.org/Documentation/QMP for QEMU commands reference.
Return values:
tuple(
exited: bool | None,
exitcode: int | None,
stdout: str | None,
stderr: str | None
)
stdout and stderr are base64 encoded strings or None.
"""
# todo command dict schema validation
if capture_output:
command['arguments']['capture-output'] = True
if isinstance(stdin, str):
command['arguments']['input-data'] = standard_b64encode(
stdin.encode('utf-8')
).decode('utf-8')
# Execute command on guest
cmd_out = self._execute(command)
if capture_output:
cmd_pid = json.loads(cmd_out)['return']['pid']
return self._get_cmd_result(
cmd_pid,
decode_output=decode_output,
wait=wait,
timeout=timeout,
)
return None, None, None, None
def shellexec(
self,
command: str,
stdin: str | None = None,
executable: str = '/bin/sh',
capture_output: bool = False,
decode_output: bool = False,
wait: bool = True,
timeout: int = DEFAULT_WAIT_TIMEOUT,
):
"""
Execute command on guest with selected shell. /bin/sh by default.
Otherwise of execute() this function brings command as string.
"""
cmd = {
'execute': 'guest-exec',
'arguments': {
'path': executable,
'arg': ['-c', command],
}
}
return self.execute(
cmd,
stdin=stdin,
capture_output=capture_output,
decode_output=decode_output,
wait=wait,
timeout=timeout,
)
def _execute(self, command: dict):
logging.debug('Execute command: vm=%s cmd=%s', self.domname, command)
try:
return libvirt_qemu.qemuAgentCommand(
self.domain, # virDomain object
json.dumps(command),
self.timeout,
self.flags,
)
except libvirt.libvirtError as err:
raise QemuAgentError(err) from err
def _get_cmd_result(
self,
pid: int,
decode_output: bool = False,
wait: bool = True,
timeout: int = DEFAULT_WAIT_TIMEOUT,
):
"""Get executed command result. See GuestAgent.execute() for info."""
exited = exitcode = stdout = stderr = None
cmd = {
'execute': 'guest-exec-status',
'arguments': {'pid': pid},
}
if not wait:
output = json.loads(self._execute(cmd))
return self._return_tuple(output, decode=decode_output)
logger.debug('Start polling command pid=%s', pid)
start_time = time()
while True:
output = json.loads(self._execute(cmd))
if output['return']['exited']:
break
sleep(POLL_INTERVAL)
now = time()
if now - start_time > timeout:
raise QemuAgentError(
f'Polling command pid={pid} took longer than {timeout} seconds.'
)
logger.debug(
'Polling command pid=%s finished, time taken: %s seconds',
pid, int(time()-start_time)
)
return self._return_tuple(output, decode=decode_output)
def _return_tuple(self, cmd_output: dict, decode: bool = False):
exited = cmd_output['return']['exited']
exitcode = cmd_output['return']['exitcode']
try:
stdout = cmd_output['return']['out-data']
if decode and stdout:
stdout = b64decode(stdout).decode('utf-8')
except KeyError:
stdout = None
try:
stderr = cmd_output['return']['err-data']
if decode and stderr:
stderr = b64decode(stderr).decode('utf-8')
except KeyError:
stderr = None
return exited, exitcode, stdout, stderr

126
node_agent/vm/main.py Normal file
View File

@ -0,0 +1,126 @@
import logging
import libvirt
from ..exceptions import VMError
from .base import VMBase
logger = logging.getLogger(__name__)
class VirtualMachine(VMBase):
@property
def name(self):
return self.domain.name()
@property
def status(self) -> str:
"""
Return VM state: 'running', 'shutoff', etc. Reference:
https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainState
"""
state = self.domain.info()[0]
match state:
case libvirt.VIR_DOMAIN_NOSTATE:
return 'nostate'
case libvirt.VIR_DOMAIN_RUNNING:
return 'running'
case libvirt.VIR_DOMAIN_BLOCKED:
return 'blocked'
case libvirt.VIR_DOMAIN_PAUSED:
return 'paused'
case libvirt.VIR_DOMAIN_SHUTDOWN:
return 'shutdown'
case libvirt.VIR_DOMAIN_SHUTOFF:
return 'shutoff'
case libvirt.VIR_DOMAIN_CRASHED:
return 'crashed'
case libvirt.VIR_DOMAIN_PMSUSPENDED:
return 'pmsuspended'
@property
def is_running(self) -> bool:
"""Return True if VM is running, else return False."""
if self.domain.isActive() != 1:
# inactive (0) or error (-1)
return False
return True
def start(self) -> None:
"""Start defined VM."""
logger.info('Starting VM: vm=%s', self.domname)
if self.is_running:
logger.debug('VM vm=%s is already started, nothing to do', self.domname)
return
try:
ret = self.domain.create()
except libvirt.libvirtError as err:
raise VMError(err) from err
if ret != 0:
raise VMError('Cannot start VM: vm=%s exit_code=%s', self.domname, ret)
def shutdown(self, force=False, sigkill=False) -> None:
"""
Send ACPI signal to guest OS to shutdown. OS may ignore this.
Use `force=True` for graceful VM destroy. Add `sigkill=True`
to hard shutdown (may corrupt guest data!).
"""
if sigkill:
flags = libvirt.VIR_DOMAIN_DESTROY_DEFAULT
else:
flags = libvirt.VIR_DOMAIN_DESTROY_GRACEFUL
if force:
ret = self.domain.destroyFlags(flags=flags)
else:
# Normal VM shutdown via ACPI signal, OS may ignore this.
ret = self.domain.shutdown()
if ret != 0:
raise VMError(
f'Cannot shutdown VM, try force or sigkill: %s', self.domname
)
def reset(self):
"""
Copypaste from libvirt doc::
Reset a domain immediately without any guest OS shutdown.
Reset emulates the power reset button on a machine, where all
hardware sees the RST line set and reinitializes internal state.
Note that there is a risk of data loss caused by reset without any
guest OS shutdown.
"""
ret = self.domian.reset()
if ret != 0:
raise VMError('Cannot reset VM: %s', self.domname)
def reboot(self) -> None:
"""Send ACPI signal to guest OS to reboot. OS may ignore this."""
ret = self.domain.reboot()
if ret != 0:
raise VMError('Cannot reboot: %s', self.domname)
def set_autostart(self) -> None:
ret = self.domain.autostart()
if ret != 0:
raise VMError('Cannot set : %s', self.domname)
def vcpu_set(self, count: int):
pass
def vram_set(self, count: int):
pass
def ssh_keys_list(self, user: str):
pass
def ssh_keys_add(self, user: str):
pass
def ssh_keys_remove(self, user: str):
pass
def set_user_password(self, user: str):
pass