Source code for x2gobroker.web.uccs

# -*- coding: utf-8 -*-
# vim:fenc=utf-8

# This file is part of the  X2Go Project - https://www.x2go.org
# Copyright (C) 2012-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

# modules
import datetime
import random
import tornado.web

# Python X2Go Broker modules
import x2gobroker.defaults

from x2gobroker.loggers import logger_broker, logger_error
import x2gobroker.uccsjson
import x2gobroker.basicauth


[docs]def credentials_validate(username, password): """\ Helper function to validate some given credentials. :param username: the username :type username: ``str`` :param password: the user's password :type password: ``str`` :returns: ``(<username>, <success>)`` tuple :rtype: ``(str, bool)`` """ import x2gobroker.brokers.base_broker # FIXME: with the below hack, the backend broker detection in X2GoBrokerWeb is disabled, only global options # from x2gobroker.conf are available here... broker = x2gobroker.brokers.base_broker.X2GoBroker() broker.enable() access, next_cookie = broker.check_access(username=username, password=password) # UCCS only allows email addresses for remote login if not access and "@" in username: username = username.split('@')[0] access, next_cookie = broker.check_access(username=username, password=password) if username == 'check-credentials' and password == 'FALSE': username = 'anonymous' return username, access
class _RequestHandler(tornado.web.RequestHandler): def _handle_request_exception(self, e): logger_error.error('HTTP request error: {error_msg}'.format(error_msg=e))
[docs]class X2GoBrokerWeb(_RequestHandler):
[docs] def get(self, path): """\ With the UCCS protocol, a HEAD request is sent to the server's base URL (sort of as an is-alive ping). If X2Go Session Broker runs in debug mode, the processing of the HEAD request reponse is also made available to GET requests. This means, you can open the base URL in a normal web browser and get a reply. :param path: URL path :type path: ``str`` """ if x2gobroker.defaults.X2GOBROKER_DEBUG: logger_broker.warn('GET http request detected, if unwanted: disable X2GOBROKER_DEBUG') return self.head(path) raise tornado.web.HTTPError(405)
[docs] def head(self, path): """\ With the UCCS protocol, a HEAD request is sent to the server's base URL (sort of as an is-alive ping). :param path: URL path :type path: ``str`` """ self.write(str(datetime.datetime.utcnow())) return
[docs]@x2gobroker.basicauth.require_basic_auth('Authentication required', credentials_validate) class X2GoBrokerWebAPI(tornado.web.RequestHandler): """\ HTTP request handler that provides the UCCS web frontend of the X2Go Session Broker. Currently, Arctica Greeter with Remote Logon Service uses this webfrontend / communication protocol format. :raises tornado.web.HTTPError: on authentication failure a 401 error is raised; on invalid API versions, a 404 error is raised. """ def __init__(self, *args, **kwargs): # latest API version is 5 self.api_version = 5 self.api_versions_supported = [4, 5] self.latest_api_version = max (self.api_versions_supported) tornado.web.RequestHandler.__init__(self, *args, **kwargs) http_header_items = { 'Content-Type': 'text/plain; charset=utf-8', 'Expires': '+1h', } def _gen_http_header(self): for http_header_item in list(self.http_header_items.keys()): self.set_header(http_header_item, self.http_header_items[http_header_item])
[docs] def get(self, *args, **kwargs): """\ Implementation of the UCCS broker API versions 4 (final) and 5 (in development). :param path: URL path :type path: ``str`` """ self._gen_http_header() backend = args[0] api_version = args[1] try: self.api_version = int(api_version) except TypeError: # assume latest API, shouldn't we actually throw an error here? self.api_version = self.latest_api_version if not backend: self.backend = x2gobroker.defaults.X2GOBROKER_DEFAULT_BACKEND else: self.backend = backend.rstrip('/') try: # dynamically detect broker backend from given URL namespace = {} exec("import x2gobroker.brokers.{backend}_broker\nbroker_backend = x2gobroker.brokers.{backend}_broker.X2GoBroker()".format(backend=self.backend), namespace) self.broker_backend = namespace['broker_backend'] except ImportError: # throw a 404 if the backend does not exist raise tornado.web.HTTPError(404) global_config = self.broker_backend.get_global_config() # throw a 404 if the WebUI is not enabled if not global_config['enable-uccs-output']: raise tornado.web.HTTPError(404) # if the broker backend is disabled in the configuration, pretend to have nothing on offer if not self.broker_backend.is_enabled(): raise tornado.web.HTTPError(404) # set the client address for the broker backend ip = self.request.remote_ip if ip: self.broker_backend.set_client_address(ip) logger_broker.info('client address is {address}'.format(address=ip)) elif not x2gobroker.defaults.X2GOBROKER_DEBUG: # if the client IP is not set, we pretend to have nothing on offer logger_error.error('client could not provide an IP address, pretending: 404 Not Found') raise tornado.web.HTTPError(404) ### ### CONFIRM SUCCESSFUL AUTHENTICATION FIRST ### try: username = kwargs['basicauth_user'] except KeyError: raise tornado.web.HTTPError(401) logger_broker.debug ('Authenticated as username: {username}, with password: <hidden>'.format(username=username)) ### ### GENERATE UCCS JSON TREE ### output = '' profile_ids = self.broker_backend.get_profile_ids_for_user(username) urlbase = self.broker_backend.get_global_value('my-uccs-url-base').rstrip('/') ms = x2gobroker.uccsjson.ManagementServer('{urlbase}/uccs/{backend}/'.format(urlbase=urlbase, backend=backend), 'X2Go Session Broker') profile_ids.sort() for profile_id in profile_ids: profile = self.broker_backend.get_profile_for_user(profile_id, username, broker_frontend='uccs') hosts = profile['host'] if type(hosts) == str: hosts = [hosts] if profile['directrdp']: ts = x2gobroker.uccsjson.RDPServer( host='{hostname}'.format(hostname=hosts[0]), name=profile['name'], username=profile['user'], api_version=self.api_version, ) ts.set_domain('LOCAL') else: # obligatory profile keys: if 'host' not in profile: raise x2gobroker.x2gobroker_exceptions.X2GoBrokerProfileException('Session profile ID \'{profile_id}\' lacks \'host\' key; profile is unusable'.format(profile_id=profile_id)) if 'name' not in profile: profile['name'] = profile_id if 'sshport' not in profile: profile['sshport'] = 22 _hostname = random.choice(hosts) _port = profile['sshport'] if 'sshport={hostname}'.format(hostname=_hostname) in profile: _port = profile['sshport={hostname}'.format(hostname=_hostname)] if 'host={hostname}'.format(hostname=_hostname) in profile: _hostname = profile['host={hostname}'.format(hostname=_hostname)] ts = x2gobroker.uccsjson.X2GoServer( host='{hostname}:{port}'.format(hostname=_hostname, port=_port), name=profile['name'], username=profile['user'], api_version=self.api_version, ) _cmd = profile['command'] if _cmd.upper() in x2gobroker.defaults.X2GO_DESKTOP_SESSIONS: _cmd = _cmd.upper() if self.api_version == 4: ts.set_session_type(_cmd) else: ts.set_command(_cmd) ms.add_terminalserver(ts) ms.set_default(ts.Name) output += ms.toJson() self.write(output) return