153 lines
5.1 KiB
Python
153 lines
5.1 KiB
Python
# This file is part of Compute
|
|
#
|
|
# Compute is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Compute is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Compute. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""Manage storage pools."""
|
|
|
|
import datetime
|
|
import logging
|
|
import time
|
|
from datetime import datetime as dt
|
|
from datetime import timedelta
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
import libvirt
|
|
from lxml import etree
|
|
|
|
from compute.exceptions import StoragePoolError, VolumeNotFoundError
|
|
|
|
from .volume import Volume, VolumeConfig
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StoragePoolUsageInfo(NamedTuple):
|
|
"""Storage pool usage info."""
|
|
|
|
capacity: int
|
|
allocation: int
|
|
available: int
|
|
|
|
|
|
class StoragePool:
|
|
"""Storage pool manipulating class."""
|
|
|
|
def __init__(self, pool: libvirt.virStoragePool):
|
|
"""Initislise StoragePool."""
|
|
self.pool = pool
|
|
self.name = pool.name()
|
|
self.path = self._get_path()
|
|
|
|
def _get_path(self) -> Path:
|
|
"""Return storage pool path."""
|
|
xml = etree.fromstring(self.pool.XMLDesc())
|
|
return Path(xml.xpath('/pool/target/path/text()')[0])
|
|
|
|
def get_usage_info(self) -> StoragePoolUsageInfo:
|
|
"""Return info about storage pool usage."""
|
|
xml = etree.fromstring(self.pool.XMLDesc())
|
|
return StoragePoolUsageInfo(
|
|
capacity=int(xml.xpath('/pool/capacity/text()')[0]),
|
|
allocation=int(xml.xpath('/pool/allocation/text()')[0]),
|
|
available=int(xml.xpath('/pool/available/text()')[0]),
|
|
)
|
|
|
|
def dump_xml(self) -> str:
|
|
"""Return storage pool XML description as string."""
|
|
return self.pool.XMLDesc()
|
|
|
|
def refresh(self, *, retry: bool = True, timeout: int = 30) -> None:
|
|
"""
|
|
Refresh storage pool.
|
|
|
|
:param retry: If True retry pool refresh on 'pool have running
|
|
asynchronous jobs' error.
|
|
:param timeout: Retry timeout in seconds. Affects only if `retry`
|
|
is True.
|
|
"""
|
|
retry_timeout = dt.now(tz=datetime.UTC) + timedelta(seconds=timeout)
|
|
while dt.now(tz=datetime.UTC) < retry_timeout:
|
|
try:
|
|
self.pool.refresh()
|
|
except libvirt.libvirtError as e:
|
|
if 'asynchronous jobs running' in e.get_error_message():
|
|
if retry is False:
|
|
raise StoragePoolError(e) from e
|
|
log.debug(
|
|
'An error ocurred when refreshing storage pool '
|
|
'retrying after 1 sec...'
|
|
)
|
|
time.sleep(1)
|
|
else:
|
|
raise StoragePoolError(e) from e
|
|
else:
|
|
return
|
|
|
|
def create_volume(self, vol_conf: VolumeConfig) -> Volume:
|
|
"""Create storage volume and return Volume instance."""
|
|
log.info(
|
|
'Create storage volume vol=%s in pool=%s', vol_conf.name, self.name
|
|
)
|
|
vol = self.pool.createXML(
|
|
vol_conf.to_xml(),
|
|
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
|
|
)
|
|
return Volume(self.pool, vol)
|
|
|
|
def clone_volume(self, src: Volume, dst: VolumeConfig) -> Volume:
|
|
"""
|
|
Make storage volume copy.
|
|
|
|
:param src: Input volume
|
|
:param dst: Output volume config
|
|
"""
|
|
log.info(
|
|
'Start volume cloning '
|
|
'src_pool=%s src_vol=%s dst_pool=%s dst_vol=%s',
|
|
src.pool_name,
|
|
src.name,
|
|
self.pool.name(),
|
|
dst.name,
|
|
)
|
|
vol = self.pool.createXMLFrom(
|
|
dst.to_xml(), # new volume XML description
|
|
src.vol, # source volume virStorageVol object
|
|
flags=libvirt.VIR_STORAGE_VOL_CREATE_PREALLOC_METADATA,
|
|
)
|
|
if vol is None:
|
|
raise StoragePoolError
|
|
return Volume(self.pool, vol)
|
|
|
|
def get_volume(self, name: str) -> Volume | None:
|
|
"""Lookup and return Volume instance or None."""
|
|
log.info(
|
|
'Lookup for storage volume vol=%s in pool=%s',
|
|
name,
|
|
self.pool.name(),
|
|
)
|
|
try:
|
|
vol = self.pool.storageVolLookupByName(name)
|
|
return Volume(self.pool, vol)
|
|
except libvirt.libvirtError as e:
|
|
if e.get_error_code() == libvirt.VIR_ERR_NO_STORAGE_VOL:
|
|
raise VolumeNotFoundError(name) from e
|
|
log.exception('unexpected error from libvirt')
|
|
raise StoragePoolError(e) from e
|
|
|
|
def list_volumes(self) -> list[Volume]:
|
|
"""Return list of volumes in storage pool."""
|
|
return [Volume(self.pool, vol) for vol in self.pool.listAllVolumes()]
|