# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# This file is part of the X2Go Project - https://www.x2go.org
# Copyright (C) 2012-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
"""\
The X2Go Broker Load Checker is an auxiliary X2Go Session Broker service
that checks the load on associated X2Go Servers in regular intervals.
The load of an X2Go Server gets checked by the Load Checker, if
* an X2Go Server is part of a multi-server session profile
* the remote X2Go Server can be queried via X2Go Broker Agent
* the session profile (or the broker globally) is configured for
background load checking (see global config parameter: ``load-checker``
in ``/etc/x2go/x2gobroker.conf``
In non-load-checker setups, multi-server session profiles perform a check
on all configured servers during the login phase of a user. On big server
farms, this check-them-all call to all members of the X2Go Server farm
can be really time consuming.
The solution is to run the X2Go Broker Load Checker service on the broker
host and let it query server availability and load in regular intervals.
It collects the server metrics and stores them in memory. If the broker
receives a :func:`select_session()
<x2gobroker.brokers.base.X2GoBroker.select_session()>` request from an
X2Go client application, it will then negotiate with the load checker to
work out, what X2Go Server is best for this incoming request.
On the X2Go Servers, the X2Go Broker Agent calculates a ``load_factor`` that
gets passed back to the X2Go Broker Load Checker when queried::
( memAvail/1000 ) * numCPUs * typeCPUs
load-factor = -------------------------------------- + 1
loadavg*100 * numSessions
"""
import threading
import time
import copy
import socket
# X2Go Session Broker modules
import x2gobroker.defaults
import x2gobroker.config
from x2gobroker.loggers import logger_broker
[docs]def check_load(backend, profile_id, hostname=None):
"""\
This function gets called from the broker daemon's side whenever the
broker needs information about associated X2Go Servers. It represents
the client-side of the load checking process in X2Go Session Broker.
It either sends a one liner 3-tuple::
<backend>\\r<profile_id>\\r<hostname>\\n
or a one liner 2-tuple::
<backend>\\r<profile_id>\\n
to the ``X2GOBROKER_LOADCHECKER_SOCKET`` (see
:mod:`x2gobroker.defaults`) and expects a number (if the hostname was
included in the query) or a Python dictionary (if only ``backend``
and ``profile_id`` had been given) as return value: the load
factor(s)
:param backend: the broker backend in use
:type backend: ``str``
:param profile_id: the session profile's ID
:type profile_id: ``str``
:param hostname: the X2Go Server's hostname as shown in
``x2golistsessions``'s output
:type hostname: ``str``
:returns: either the load factor of the asked for server (as ``int``) or
the load factors (as ``dict``) of all server members of the given session profile
server
:rtype: ``int`` or ``dict``
"""
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logger_broker.debug('loadchecker.check_load(): connecting to load checker service socket {socket}'.format(socket=x2gobroker.defaults.X2GOBROKER_LOADCHECKER_SOCKET))
try:
s.connect(x2gobroker.defaults.X2GOBROKER_LOADCHECKER_SOCKET)
except socket.error as e:
logger_broker.error('loadchecker.check_load(): failure when connecting to the load checker service socket {socket}: {errmsg}'.format(socket=x2gobroker.defaults.X2GOBROKER_LOADCHECKER_SOCKET, errmsg=str(e)))
if hostname is not None:
load_factor = 'LOAD-UNAVAILABLE'
logger_broker.debug('loadchecker.check_load(): sending backend={backend}, profile_id={profile_id}, hostname={hostname} to load checker service'.format(backend=backend, profile_id=profile_id, hostname=hostname))
try:
s.send('{backend}\r{profile_id}\r{hostname}\n'.format(backend=backend, profile_id=profile_id, hostname=hostname).encode())
load_factor = s.recv(1024).decode()
s.close()
except socket.error as e:
logger_broker.error('loadchecker.check_load(): failure when sending data to the load checker service socket {socket}: {errmsg}'.format(socket=x2gobroker.defaults.X2GOBROKER_LOADCHECKER_SOCKET, errmsg=str(e)))
if load_factor.startswith('LOAD-UNAVAILABLE'):
logger_broker.warning('loadchecker.check_load(): load unavailable for backend={backend}, profile_id={profile_id}, hostname={hostname}'.format(backend=backend, profile_id=profile_id, hostname=hostname))
return 'LOAD-UNAVAILABLE'
try:
load_factor = int(load_factor)
except ValueError:
logger_broker.warning('loadchecker.check_load(): load data for backend={backend}, profile_id={profile_id}, hostname={hostname} contained bogus (»{lf}«)'.format(backend=backend, profile_id=profile_id, hostname=hostname, lf=load_factor))
return 'LOAD-DATA-BOGUS'
logger_broker.info('loadchecker.check_load(): load factor for backend={backend}, profile_id={profile_id}, hostname={hostname} is: {lf}'.format(backend=backend, profile_id=profile_id, hostname=hostname, lf=load_factor))
return load_factor
else:
raw_output = ""
logger_broker.debug('loadchecker.check_load(): sending backend={backend}, profile_id={profile_id} to load checker service'.format(backend=backend, profile_id=profile_id, hostname=hostname))
try:
s.send('{backend}\r{profile_id}\r\n'.format(backend=backend, profile_id=profile_id).encode())
raw_output = s.recv(1024).decode()
s.close()
except socket.error as e:
logger_broker.error('loadchecker.check_load(): failure when sending data to the load checker service socket {socket}: {errmsg}'.format(socket=x2gobroker.defaults.X2GOBROKER_LOADCHECKER_SOCKET, errmsg=str(e)))
load_factors = {}
items = raw_output.split('\n')
for item in items:
if ":" in item:
key, val = item.split(':', 1)
try:
if val not in ('HOST-UNREACHABLE', 'LOAD-UNAVAILABLE', 'LOAD-DATA-BOGUS'):
load_factors[key] = int(val)
else:
load_factors[key] = val
except ValueError:
logger_broker.warning('loadchecker.check_load(): load data for backend={backend}, profile_id={profile_id}, hostname={hostname} contained bogus (»{lf}«)'.format(backend=backend, profile_id=profile_id, hostname=hostname, lf=val))
load_factors[key] = 'LOAD-DATA-BOGUS'
logger_broker.info('loadchecker.check_load(): load metrics for backend={backend}, profile_id={profile_id} are: {lf}'.format(backend=backend, profile_id=profile_id, hostname=hostname, lf=load_factors))
return load_factors
[docs]class LoadChecker(threading.Thread):
"""\
The :class:`LoadChecker` class provides the functionality of setting up
a load checker service. It is the brain of the ``x2gobroker-loadchecker``
executable.
With it you can instantiate a new LoadChecker object for querying
remote X2Go Broker Agent instances about server/system load, CPU
usage, etc. in regular intervals.
:param config_file: global ``x2gobroker`` config file
:type config_file: a :mod:`configparser` compliant ``<obj>``
:param config_defaults: default (hard-coded) configuration parameters
for all parameters missing in the ``config_file``
:type config_defaults: ``dict``
:param logger: a :mod:`logging` instance
:type logger: ``<obj>``
:param kwargs: Any other parameter (for future features' compatibility, all ignored for now)
:type kwargs: ``dict``
"""
def __init__(self, config_file=None, config_defaults=None, logger=None, **kwargs):
self.logger = logger
self.config_file = config_file
if self.config_file is None: self.config_file = x2gobroker.defaults.X2GOBROKER_CONFIG
self.config_defaults = config_defaults
if self.config_defaults is None: self.config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS
self.kwargs = kwargs
threading.Thread.__init__(self, target=self.loadchecker)
self.server_load = {}
self.keep_alive = True
self.daemon = True
[docs] def get_server_load(self, backend, profile_id, hostname):
"""\
Retrieve system load factor for a given server (via broker backend,
profile ID and hostname).
:param backend: broker backend to query.
:type backend: ``str``
:param profile_id: profile ID of the session profile to query
:type profile_id: ``str``
:param hostname: hostname of the X2Go Server
:type hostname: ``str``
:returns: load factor of the given server (or None if an error occurs)
:rtype: ``int``
"""
try:
return self.server_load[backend][profile_id][hostname]
except KeyError:
return None
[docs] def get_profile_load(self, backend, profile_id):
"""\
Retrieve system load factors for all member servers of the given
profile ID (and the given broker backend).
:param backend: broker backend to query.
:type backend: ``str``
:param profile_id: profile ID of the session profile to query
:type profile_id: ``str``
:returns: load factors of the given profile ID (or None if an error occurs)
:rtype: ``dict``
"""
try:
return self.server_load[backend][profile_id]
except KeyError:
return None
[docs] def loadchecker(self):
"""\
This is the actual thread runner of the :class:`LoadChecker``
class.
It queries configured / available X2Go Broker Agents in regular
intervals about system load, CPU types and usage.
"""
time_to_sleep = 0
self.config = x2gobroker.config.X2GoBrokerConfigFile(config_files=self.config_file, defaults=self.config_defaults)
self.load_checker_intervals = self.config.get_value('global', 'load-checker-intervals')
self.broker_backends = [ "_".join(bs.split('_')[1:]) for bs in self.config.list_sections() if bs.startswith('broker_') and self.config.get_value(bs, 'enable') ]
while self.keep_alive:
# potentially, the X2Go Session Broker can manage different broker backends at the same time, so we initialize
# all configured/enabled broker backends
self.brokers = {}
num_queries = 0
num_failed_queries = 0
if self.logger: self.logger.debug('LoadChecker.loadchecker(): load checker thread waking up...')
for backend in self.broker_backends:
if backend not in self.server_load:
self.server_load[backend] = {}
_broker_backend_module = None
namespace = {}
exec("import x2gobroker.brokers.{backend}_broker as _broker_backend_module".format(backend=backend), namespace)
_broker_backend_module = namespace['_broker_backend_module']
self.brokers[backend] = _broker_backend_module.X2GoBroker(config_file=self.config_file, config_defaults=self.config_defaults, **self.kwargs)
profile_ids_to_check = [ id for id in self.brokers[backend].get_profile_ids() if self.brokers[backend].use_load_checker(id) ]
if self.logger: self.logger.debug('LoadChecker.loadchecker(): backend={backend} -> processing profiles: {profile_ids}'.format(backend=backend, profile_ids=profile_ids_to_check))
for profile_id in profile_ids_to_check:
if profile_id not in self.server_load[backend]:
self.server_load[backend][profile_id] = {}
remote_agents = self.brokers[backend].get_all_remote_agents(profile_id)
if self.logger: self.logger.debug('LoadChecker.loadchecker(): querying remote agents for backend={backend}, profile_id={profile_id}: {remote_agents}'.format(backend=backend, profile_id=profile_id, remote_agents=remote_agents))
for remote_agent in remote_agents:
_load_factor = x2gobroker.agent.check_load(remote_agent, logger=self.logger)
num_queries += 1
if _load_factor is None:
if self.logger: self.logger.info('LoadChecker.loadchecker(): backend={backend}, profile_id={profile_id}, hostname={hostname}, load factor not available'.format(backend=backend, profile_id=profile_id, hostname=remote_agent['hostname']))
num_failed_queries += 1
elif type(_load_factor) is int:
if self.logger: self.logger.info('LoadChecker.loadchecker(): contacted remote broker agent for backend={backend}, profile_id={profile_id}, hostname={hostname}, new load factor is: {lf}'.format(backend=backend, profile_id=profile_id, hostname=remote_agent['hostname'], lf=_load_factor))
else:
if self.logger: self.logger.warning('LoadChecker.loadchecker(): no load factor could be obtained for backend={backend}, profile_id={profile_id}, hostname={hostname}, reason: {reason}'.format(backend=backend, profile_id=profile_id, hostname=remote_agent['hostname'], reason=_load_factor))
self.server_load[backend][profile_id][remote_agent['hostname']] = _load_factor
if time_to_sleep > 0:
if self.logger: self.logger.debug('LoadChecker.loadchecker(): sleeping for {secs}secs before querying next server'.format(secs=time_to_sleep))
time.sleep(time_to_sleep)
# clean up vanished hostnames
_hostnames = list(self.server_load[backend][profile_id].keys())
for hostname in _hostnames:
if hostname not in [ ra['hostname'] for ra in remote_agents ]:
del self.server_load[backend][profile_id][hostname]
# clean up vanished profile IDs
_profile_ids = copy.deepcopy(list(self.server_load[backend].keys()))
for profile_id in _profile_ids:
if profile_id not in profile_ids_to_check:
del self.server_load[backend][profile_id]
# clean up vanished backends
_backends = copy.deepcopy(list(self.server_load.keys()))
for backend in _backends:
if backend not in self.broker_backends:
del self.server_load[backend]
# don't do all queries every 300-or-so seconds, but distribute next round of queries over the
# complete load_checker_intervals range
if time_to_sleep == 0:
if self.logger: self.logger.debug('LoadChecker.loadchecker(): sleeping for {secs}secs before starting next query cycle'.format(secs=self.load_checker_intervals))
time.sleep(self.load_checker_intervals)
if num_queries > 0:
if time_to_sleep > 0:
if self.logger: self.logger.debug('LoadChecker.loadchecker(): performed {num} queries (failures: {num_failures}), sleeping for {secs}secs before starting next query cycle'.format(num=num_queries, num_failures=num_failed_queries, secs=time_to_sleep))
time.sleep(time_to_sleep)
time_to_sleep = self.load_checker_intervals / (num_queries +1)
else:
if self.logger: self.logger.warning('LoadChecker.loadchecker(): performed {num} queries (failures: {num_failures}) in this cycle, if this message keeps repeating itself, consider disabling the X2Go Broker Load Checker daemon'.format(num=num_queries, num_failures=num_failed_queries, secs=self.load_checker_intervals - time_to_sleep * num_queries))
time_to_sleep = 0
[docs] def stop_thread(self):
"""\
Induce a stop of the running :class:`LoadChecker`' thread.
When stopped, no more queries to remote X2Go Servers will be
made.
"""
self.keep_alive = False