Source code for arbiterd.common.cpu

# -*- coding: utf-8 -*-
# Copyright 2021 - 2021, Sean Mooney and the arbiterd contributors
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import typing as ty

from arbiterd.common import filesystem

LOG = logging.getLogger(__name__)


[docs]def parse_cpu_spec(spec: str) -> ty.Set[int]: # derived from nova.virt.hardware # https://github.com/openstack/nova/blob/8f250f5/nova/virt/hardware.py#L96-L155 """Parse a CPU set specification. Each element in the list is either a single CPU number, a range of CPU numbers, or a caret followed by a CPU number to be excluded from a previous range. :param spec: cpu set string eg "1-4,^3,6" :returns: a set of CPU indexes """ cpuset_ids: ty.Set[int] = set() cpuset_reject_ids: ty.Set[int] = set() for rule in spec.split(','): rule = rule.strip() # Handle multi ',' if len(rule) < 1: continue # Note the count limit in the .split() call range_parts = rule.split('-', 1) if len(range_parts) > 1: reject = False if range_parts[0] and range_parts[0][0] == '^': reject = True range_parts[0] = str(range_parts[0][1:]) # So, this was a range; start by converting the parts to ints start, end = [int(p.strip()) for p in range_parts] # Make sure it's a valid range if start > end: raise ValueError(f'Invalid range expression: {rule}') # Add available CPU ids to set if not reject: cpuset_ids |= set(range(start, end + 1)) else: cpuset_reject_ids |= set(range(start, end + 1)) elif rule[0] == '^': # Not a range, the rule is an exclusion rule; convert to int cpuset_reject_ids.add(int(rule[1:].strip())) else: # OK, a single CPU to include; convert to int cpuset_ids.add(int(rule)) # Use sets to handle the exclusion rules for us cpuset_ids -= cpuset_reject_ids return cpuset_ids
AVAILABLE_PATH = 'devices/system/cpu/present'
[docs]def get_available_cpus() -> ty.Set[int]: return parse_cpu_spec(filesystem.read_sys(AVAILABLE_PATH)) or set()
ONLINE_PATH = 'devices/system/cpu/online'
[docs]def get_online_cpus() -> ty.Set[int]: return parse_cpu_spec(filesystem.read_sys(ONLINE_PATH)) or set()
OFFLINE_PATH = 'devices/system/cpu/offline'
[docs]def get_offline_cpus() -> ty.Set[int]: return parse_cpu_spec(filesystem.read_sys(OFFLINE_PATH)) or set()
[docs]def nproc() -> int: return len(get_available_cpus())
CPU_PATH_TEMPLATE = 'devices/system/cpu/cpu'
[docs]def gen_cpu_path(core: int) -> str: if not exists(core): err = ValueError(f'CPU: {core} does not exist') LOG.debug(f'attempt to access cpu: {core}, error: {err}') raise err sys = filesystem.get_sys_fs_mount() return str(os.path.join(sys, f'{CPU_PATH_TEMPLATE}{core}'))
[docs]def gen_cpu_paths() -> ty.Iterable[str]: sys = filesystem.get_sys_fs_mount() for core in range(nproc()): yield str(os.path.join(sys, f'{CPU_PATH_TEMPLATE}{core}'))
[docs]def exists(cpu: int) -> bool: return cpu in get_available_cpus()
[docs]def get_online(cpu_path: str) -> bool: online = filesystem.read_sys( os.path.join(cpu_path, 'online'), default='1').strip() return online == '1'
[docs]def set_online(cpu_path: str) -> bool: online = get_online(cpu_path) if online: return True filesystem.write_sys(os.path.join(cpu_path, 'online'), data='1') return get_online(cpu_path)
[docs]def set_offline(cpu_path: str) -> bool: online = get_online(cpu_path) if not online: return True filesystem.write_sys(os.path.join(cpu_path, 'online'), data='0') return not get_online(cpu_path)