python-compute/compute/storage/pool.py
2024-05-17 00:03:29 +03:00

153 lines
5.1 KiB
Python

# This file is part of Compute
#
# Compute is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Compute is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Compute. If not, see <http://www.gnu.org/licenses/>.
"""Manage storage pools."""
import datetime
import logging
import time
from datetime import datetime as dt
from datetime import timedelta
from pathlib import Path
from typing import NamedTuple
import libvirt
from lxml import etree
from compute.exceptions import StoragePoolError, VolumeNotFoundError
from .volume import Volume, VolumeConfig
log = logging.getLogger(__name__)
class StoragePoolUsageInfo(NamedTuple):
"""Storage pool usage info."""
capacity: int
allocation: int
available: int
class StoragePool:
"""Storage pool manipulating class."""
def __init__(self, pool: libvirt.virStoragePool):
"""Initislise StoragePool."""
self.pool = pool
self.name = pool.name()
self.path = self._get_path()
def _get_path(self) -> Path:
"""Return storage pool path."""
xml = etree.fromstring(self.pool.XMLDesc())
return Path(xml.xpath('/pool/target/path/text()')[0])
def get_usage_info(self) -> StoragePoolUsageInfo:
"""Return info about storage pool usage."""
xml = etree.fromstring(self.pool.XMLDesc())
return StoragePoolUsageInfo(
capacity=int(xml.xpath('/pool/capacity/text()')[0]),
allocation=int(xml.xpath('/pool/allocation/text()')[0]),
available=int(xml.xpath('/pool/available/text()')[0]),
)
def dump_xml(self) -> str:
"""Return storage pool XML description as string."""
return self.pool.XMLDesc()
def refresh(self, *, retry: bool = True, timeout: int = 30) -> None:
"""
Refresh storage pool.
:param retry: If True retry pool refresh on 'pool have running
asynchronous jobs' error.
:param timeout: Retry timeout in seconds. Affects only if `retry`
is True.
"""
retry_timeout = dt.now(tz=datetime.UTC) + timedelta(seconds=timeout)
while dt.now(tz=datetime.UTC) < retry_timeout:
try:
self.pool.refresh()
except libvirt.libvirtError as e:
if 'asynchronous jobs running' in e.get_error_message():
if retry is False:
raise StoragePoolError(e) from e
log.debug(
'An error ocurred when refreshing storage pool '
'retrying after 1 sec...'
)
time.sleep(1)
else:
raise StoragePoolError(e) from e
else:
return
def create_volume(self, vol_conf: VolumeConfig) -> Volume:
"""Create storage volume and return Volume instance."""
log.info(
'Create storage volume vol=%s in pool=%s', vol_conf.name, self.name
)
vol = self.pool.createXML(
vol_conf.to_xml(),
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
)
return Volume(self.pool, vol)
def clone_volume(self, src: Volume, dst: VolumeConfig) -> Volume:
"""
Make storage volume copy.
:param src: Input volume
:param dst: Output volume config
"""
log.info(
'Start volume cloning '
'src_pool=%s src_vol=%s dst_pool=%s dst_vol=%s',
src.pool_name,
src.name,
self.pool.name(),
dst.name,
)
vol = self.pool.createXMLFrom(
dst.to_xml(), # new volume XML description
src.vol, # source volume virStorageVol object
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
)
if vol is None:
raise StoragePoolError
return Volume(self.pool, vol)
def get_volume(self, name: str) -> Volume | None:
"""Lookup and return Volume instance or None."""
log.info(
'Lookup for storage volume vol=%s in pool=%s',
name,
self.pool.name(),
)
try:
vol = self.pool.storageVolLookupByName(name)
return Volume(self.pool, vol)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_STORAGE_VOL:
raise VolumeNotFoundError(name) from e
log.exception('unexpected error from libvirt')
raise StoragePoolError(e) from e
def list_volumes(self) -> list[Volume]:
"""Return list of volumes in storage pool."""
return [Volume(self.pool, vol) for vol in self.pool.listAllVolumes()]