Source code for psec.secrets_environment

# -*- coding: utf-8 -*-

Secrets environment class and related variables, functions.

import json
import logging
import os
import re
import secrets  # noqa

from collections import OrderedDict
from pathlib import Path
from shutil import copy
from stat import S_IMODE
from typing import Union

from psec.exceptions import (
from psec.utils import (
from .factory import SecretFactory
from .handlers import *  # noqa: F401,F403

logger = logging.getLogger(__name__)

# This module relies in part on features from the xkcdpass module.
# Copyright (c) 2011 - 2019, Steven Tobin and Contributors.
# All rights reserved.

    {'descr': 'True', 'ident': 'true'},
    {'descr': 'False', 'ident': 'false'},

secret_factory = SecretFactory()
SECRET_TYPES = secret_factory.describe_secret_classes()

# FIXME: Left for backwards compatibility.
[docs]def is_generable(secret_type=None): """ Return boolean for generability of this secret type. """ return SecretFactory.get_handler(secret_type).is_generable()
# FIXME: Left for backwards compatibility.
[docs]def generate_secret(secret_type, **kwargs): """ Generate secret of the specified type. """ return SecretFactory.get_handler(secret_type).generate_secret(**kwargs)
[docs]class SecretsEnvironment(object): """ Class for handling secrets environment metadata. Provides an interface to the directory contents for a secrets environment, including groups descriptions, a tmp/ directory, and any other required directories. Typical usage example:: from psec.secrets_environment import SecretsEnvironment se = SecretsEnvironment(environment='env_name') Attributes: environment: Name of the environment. secrets_basedir: Base directory path to environment's storage. secrets_file: File name for storing secrets (defaults to 'secrets.json'). create_root: Controls whether the root directory is created on first use. defer_loading: Don't load values (just initialize attributes). export_env_vars: Export all variables to the environment. preserve_existing: Don't over-write existing environment variables. env_var_prefix: Prefix to apply to all exported environment variables. source: Directory path from which to clone a new environment. verbose_level: Verbosity level (pass from app args). """ # noqa logger = logging.getLogger(__name__)
[docs] def __init__( self, environment=None, secrets_basedir=None, secrets_file=None, create_root=False, defer_loading=False, export_env_vars=False, preserve_existing=False, env_var_prefix=None, source=None, verbose_level=1, ): """ Initialize secrets environment object. """ if secrets_file and secrets_basedir: raise RuntimeError( "[-] 'secrets_file' and 'secrets_basedir' are mutually " "exclusive when initializing a SecretsEnvironment()" ) self._environment = ( get_default_environment() if environment is None else environment ) if secrets_basedir is None: secrets_basedir = get_default_secrets_basedir() try: is_secrets_basedir(basedir=secrets_basedir, raise_exception=True) self._secrets_basedir = Path(secrets_basedir) except BasedirNotFoundError: if create_root: self._secrets_basedir = secrets_basedir_create( basedir=secrets_basedir ) else: raise if secrets_file is not None: self._secrets_file = Path(secrets_file) if len( < 3: # This might not exactly be true in all cases, but I don't # have time to run down all possible use cases (or put in # all necessary checks) at the moment. raise RuntimeError( '[-] the path to a secrets file should have ' f"at least 3 components ('{self._secrets_file}')" ) if not self._secrets_file.exists(): raise RuntimeError( f"[-] the specified secrets file '{secrets_file}' " 'does not exist' ) if self._environment !=[-2]: raise RuntimeError( f"[-] environment name '{environment}' does not " f"match secrets file path '{secrets_file}'" ) if Path(secrets_basedir) not in self._secrets_file.parents: raise RuntimeError( f"[-] base directory '{secrets_basedir}' does not " f"match secrets file path '{secrets_file}'" ) self.secrets_basedir = self._secrets_file.parents[1] else: self._secrets_file = Path(self._secrets_basedir) / str(self._environment) / SECRETS_FILE # noqa self._secrets_descriptions = self._secrets_file.parent / SECRETS_DESCRIPTIONS_DIR # noqa self._verbose_level = verbose_level self.export_env_vars = export_env_vars self.preserve_existing = preserve_existing # When exporting environment variables, include one that specifies the # environment from which these variables were derived. This also works # around a limitation in Ansible where the current working directory # from which "ansible" was run. (The D2 lookup_plugin/ # script needs to communicate this back to python_secrets in order for # it's .python_secrets_environment file to be used to identify the # proper environment.) if self.export_env_vars is True: os.environ['D2_ENVIRONMENT'] = self._environment # Deprecating this variable name: os.environ['PYTHON_SECRETS_ENVIRONMENT'] = self._environment self.env_var_prefix = env_var_prefix # Secrets attribute maps; anything else throws exception for a in SECRET_ATTRIBUTES: setattr(self, a, OrderedDict()) if source is not None: self.clone_from(source) self.read_secrets_descriptions() self._secrets = OrderedDict() self._descriptions = OrderedDict() self._changed = False for attribute in SECRET_ATTRIBUTES: self.__dict__[attribute] = {}
[docs] def __str__(self): """Produce string representation of environment identifier""" return str(self._environment)
@property def verbose_level(self): """Returns the verbosity level.""" return self._verbose_level
[docs] def changed(self): """Return boolean reflecting changed secrets.""" return self._changed
# TODO(dittrich): FIX Cere call
[docs] def get_secrets_descriptions_dir(self): """Return the path to the drop-in secrets description directory""" _env = self._environment if not _env: return self.get_secrets_basedir() else: return self.get_secrets_basedir() / self.get_secrets_basename().replace('.json', '.d') # noqa
[docs] def get_secrets_basename(self): """Return the basename of the current secrets file""" return os.path.basename(self._secrets_file)
[docs] def get_secrets_basedir(self, init=False, mode=DEFAULT_MODE): """ Returns the directory path root for secrets storage and definitions. When more than one environment is being used, a single top-level directory in the user's home directory is the preferred location. This function checks to see if such a directory exists, and if so defaults to that location. If the environment variable "D2_SECRETS_BASEDIR" is set, that location is used instead. """ try: secrets_basedir = self._secrets_basedir except AttributeError: secrets_basedir = get_default_secrets_basedir() if init: secrets_basedir_create( basedir=secrets_basedir, mode=mode, ) else: is_secrets_basedir(basedir=secrets_basedir, raise_exception=True) return secrets_basedir
[docs] def secrets_basedir_exists(self): """Return whether secrets root directory exists""" return self._secrets_basedir.exists()
[docs] def get_environment_path(self, env=None, subdir=None, host=None): """Returns the absolute path to secrets environment directory or subdirectories within it""" if env is None: env = self._environment _path = self.get_secrets_basedir() / str(env) if not (subdir is None and host is None): valid_subdir = r'a-zA-Z0-9_/' invalid_subdir = re.compile(f'[^{valid_subdir}]') valid_host = r'a-zA-Z0-9_\./' # noqa invalid_host = re.compile(f'[^{valid_host}]') if subdir is None and host is not None: raise RuntimeError( '[-] Must specify subdir when specifying host') if subdir is not None: if subdir.startswith('/'): raise RuntimeError('[-] subdir may not start with "/"') elif subdir.endswith('/'): raise RuntimeError('[-] subdir may not end with "/"') if not bool( _path = _path / subdir else: raise RuntimeError( "[-] invalid character in subdir: " f"must be in [{valid_subdir}]" ) if host is not None: if not bool( _path = _path / host else: raise RuntimeError( "[-] invalid character in host: " f"must be in [{valid_host}]") return Path(_path)
[docs] def environment_exists(self, env=None, path_only=False): """Return whether secrets environment directory exists and contains files other than 'tmp' directory.""" _ep = self.get_environment_path(env=env) result = self.get_descriptions_path().is_dir() if not result and _ep.exists(): if path_only: result = True else: _files = list() for root, _, filenames in os.walk(_ep): for filename in filenames: if filename != 'tmp': _files.append(os.path.join(root, filename)) result = len(_files) > 0 return result
[docs] def environment_create( self, source=None, alias=False, mode=DEFAULT_MODE, ): """Create secrets environment directory""" env_path = self.get_environment_path() if not alias: # Create a new environment (optionally from an # existing environment) if self.environment_exists(): raise PsecEnvironmentAlreadyExistsError( environment=self._environment ) os.makedirs(env_path, exist_ok=True, mode=mode) if source is not None: self.clone_from(source) else: # Just create an alias (symbolic link) to # an existing environment if self.environment_exists(): raise RuntimeError( f"[-] environment '{self._environment}' " "already exists") source_env = SecretsEnvironment(environment=source) # Create a symlink with a relative path os.symlink(str(source_env), env_path)
[docs] def get_secrets_file_path(self, env=None): """Returns the absolute path to secrets file""" if env is None: env = self._environment return self.get_secrets_basedir() / str(env) / self._secrets_file
[docs] def secrets_file_path_exists(self): """Return whether secrets file exists""" return self.get_secrets_file_path().exists()
[docs] def get_descriptions_path( self, root=None, group=None, create=False, mode=DEFAULT_MODE ): """Return path to secrets descriptions directory or file.""" if root is not None: path = Path(root) / self._secrets_descriptions else: path = self.get_environment_path() / self._secrets_descriptions if create: path.mkdir(parents=True, exist_ok=True, mode=mode) if group is not None: path = path / f"{group}.json" return path
[docs] def get_tmpdir_path(self, create_path=False): """Return the absolute path to secrets descriptions tmp directory""" tmpdir = self.get_environment_path() / "tmp" if create_path: tmpdir_mode = 0o700 try: os.makedirs(tmpdir, tmpdir_mode)"[+] created tmpdir %s", tmpdir) except FileExistsError: mode = os.stat(tmpdir).st_mode current_mode = S_IMODE(mode) if current_mode != tmpdir_mode: os.chmod(tmpdir, tmpdir_mode) "[+] changed mode on %s from %s to %s", oct(current_mode), oct(tmpdir_mode), tmpdir ) return tmpdir
[docs] def requires_environment(self, path_only=False): """ Provide consistent error handling for any commands that require an environment actually exist in order to work properly. """ if not self.environment_exists(path_only=path_only): raise RuntimeError( f"[-] environment '{self._environment}' " f"does not exist in {self._secrets_basedir} " "or is empty")
[docs] def keys(self): """Return the keys to the secrets dictionary""" return [s for s in self._secrets.keys()]
[docs] def items(self): """Return the items from the secrets dictionary.""" return self._secrets.items()
[docs] def get_secret(self, secret, allow_none=False): """Get the value of secret Args: secret (string): Name of the secret to get allow_none (boolean): Allow returning ``None`` Returns: string: The value of the secret Raises: SecretNotFoundError: If value is ``None`` and ``allow_none`` is ``False`` """ if secret is None: raise RuntimeError('[-] must specify secret to get') v = self._secrets.get(secret, None) if v is None and not allow_none: raise SecretNotFoundError(secret=secret) return v
[docs] def get_secret_export(self, secret): """Get the specified environment variable for exporting secret :param secret: :type: string :return: environment variable for exporting secret """ return self.Export.get(secret, secret) # type: ignore
def _set_secret(self, secret, value): """Set secret to value and export environment variable :param secret: :type: string :param value: :type: string :return: """ self._secrets[secret] = value # DEPRECATED getattr(self, 'Variable')[secret] = value if self.export_env_vars: if self.preserve_existing and bool(os.getenv(secret)): raise RuntimeError( "[-] refusing to overwrite environment " f"variable '{secret}'") # Export with secrets name first. os.environ[secret] = str(value) # See if an alternate environment variable name is # defined and also export as that. # TODO(dittrich): Support more than one, someday, maybe? _env_var = self.get_secret_export(secret) if _env_var is None: if self.env_var_prefix is not None: _env_var = f'{self.env_var_prefix}{secret}' else: _env_var = secret if self.preserve_existing and bool(os.getenv(_env_var)): raise RuntimeError( "[-] refusing to overwrite environment " f"variable '{_env_var}'") os.environ[_env_var] = str(value)
[docs] def set_secret(self, secret, value=None): """Set secret to value and record change :param secret: :type: string :param value: :type: string :return: """ self._set_secret(secret, value) # DEPRECATED getattr(self, 'Variable', {secret: value}) self._changed = True
[docs] def delete_secret(self, secret): """Delete a secret and record change. :param secret: :type: string :return: """ try: del self.Variable[secret] # type: ignore del self.Type[secret] del self._secrets[secret] except KeyError: pass else: self._changed = True
[docs] def get_type(self, variable): """Return type for variable or None if no description""" return self.Type.get(variable, None) # type: ignore
[docs] def get_default_value(self, variable): """Return the default value from the Options attribute""" try: values = self.Options.get(variable).split(',') # type: ignore except AttributeError: values = [] return ( values[0] if len(values) > 0 and values[0] != '*' else '' )
[docs] def read_secrets_and_descriptions(self, ignore_errors=False): """Read secrets descriptions and secrets.""" self.read_secrets_descriptions(ignore_errors=ignore_errors) self.read_secrets(from_descriptions=True) self.find_new_secrets()
[docs] def find_new_secrets(self): """ Ensure that any new secrets defined in description files are called out and/or become new undefined secrets. :return: """ # TODO(dittrich): Replace this with simpler use of attribute maps for group in self._descriptions.keys(): for i in self._descriptions[group]: if len(i) == 0: raise RuntimeError( f"[-] found empty dictionary item in group '{group}'") s = i['Variable'] t = i['Type'] if self.get_secret(s, allow_none=True) is None: if self.verbose_level > 1: self.logger.warning( "[!] new %s variable '%s' is unset", t, s ) self._set_secret(s, None)
[docs] def read_secrets(self, from_descriptions=False): """ Load the current secrets file. If no secrets have been set yet and from_descriptions is True, return a dictionary comprised of the keys from the descriptions dictionary defined to be None and set self._changed to ensure these are written out. """ _fname = self.get_secrets_file_path() yaml_fname = _fname.parent / f"{_fname.stem}.yml" # TODO(dittrich): Add upgrade feature... some day. # Until then, reference a way for anyone affected to manually # convert files. if yaml_fname.exists(): raise RuntimeError( f"[-] old YAML style file '{yaml_fname}' found:\n" f"[-] see ``psec utils yaml-to-json --help`` for " "information about converting to JSON") self.logger.debug("[+] reading secrets from '%s'", str(_fname)) try: _secrets = json.loads( _fname.read_text(), object_pairs_hook=OrderedDict, ) for k, v in _secrets.items(): self._set_secret(k, v) except FileNotFoundError as err: if from_descriptions: for group in self._descriptions.keys(): for i in self._descriptions[group]: self._set_secret(i['Variable'], None) # Ensure these get written out to create a secrets file. self._changed = True else: raise err return self
[docs] def write_secrets(self): """Write out the current secrets if any changes were made""" if self._changed: _fname = self.get_secrets_file_path() self.logger.debug("[+] writing secrets to '%s'", _fname) with'w', encoding='utf-8') as f: json.dump(self._secrets, f, indent=2) # type: ignore f.write('\n') self._changed = False remove_other_perms(_fname) else: self.logger.debug('[-] not writing secrets (unchanged)')
[docs] def clone_from(self, src: Union[Path, str]): """ Clone from existing definition file(s) The source can be (a) a directory full of one or more group descriptions, (b) a single group descriptions file, or (c) an existing environment's descriptions file(s). """ if isinstance(src, Path): if src.is_dir() and src.suffix != '.d': raise RuntimeError( "[-] refusing to process a directory without " f"a '.d' extension ('{str(src)}')") elif src.is_file() and src.suffix != '.json': raise RuntimeError( "[-] refusing to process a file without " f"a '.json' extension ('{str(src)}')") else: if self.environment_exists(env=src): # Only copy descriptions when cloning from environment. src_env = SecretsEnvironment(environment=src) src = src_env.get_descriptions_path() else: src = Path(src) if not src.exists(): raise RuntimeError( f"[-] directory or environment '{src}' does not exist") dest = self.get_descriptions_path(create=True) if src.suffix == '.d': # Only copy descriptions when cloning from environment. copydescriptions(src, dest) else: # Copy just the one file when cloning from a file. copy(src, dest) remove_other_perms(dest) self.read_secrets_descriptions() self.find_new_secrets()
[docs] def read_descriptions(self, infile=None, group=None): """ Read a secrets group description file and return a dictionary if valid. :param infile: :param group: :return: dictionary of descriptions """ if group is not None: # raise RuntimeError('[!] no group specified') infile = self.get_descriptions_path(group=group) if infile is None: raise RuntimeError( '[!] must specify an existing group or file to read') with open(infile, 'r') as f: data = json.load(f, object_pairs_hook=OrderedDict) for d in data: for k in d.keys(): if k not in SECRET_ATTRIBUTES: raise RuntimeError( f"[-] invalid attribute '{k}' in '{infile}'") return data
[docs] def write_descriptions( self, data={}, group=None, mode=DEFAULT_MODE, mirror_to=None ): """Write out the secrets descriptions to a file.""" if group is None: raise RuntimeError('[!] no group specified') outfiles = [self.get_descriptions_path(group=group)] if mirror_to is not None: outfiles.append( self.get_descriptions_path( root=mirror_to, group=group, ) ) for outfile in outfiles: os.makedirs(os.path.dirname(outfile), exist_ok=True, mode=mode) with open(outfile, 'w') as f: f.write(json.dumps(data, indent=2)) f.write('\n')
[docs] def check_duplicates(self, data=None): """ Check to see if any 'Variable' dictionary elements in list match any already defined variables. If so, raise RuntimeError(). :param data: list of dictionaries containing secret descriptions :return: None """ if isinstance(data, list): for d in data: v = d.get('Variable') if v in self._secrets: raise RuntimeError( f"[-] variable '{v}' duplicates an existing variable" )
[docs] def read_secrets_descriptions( self, ignore_errors=False, ): """Load the descriptions of groups of secrets from a .d directory""" groups_dir = self.get_descriptions_path() if not groups_dir.exists(): if not ignore_errors: '[-] secrets descriptions directory not found' ) else: # Ignore .order file and any other file extensions extensions = ['.json'] file_names = [ fn for fn in groups_dir.iterdir() if fn.suffix in extensions ] self.logger.debug( "[+] reading secrets descriptions from '%s'", groups_dir) # Iterate over files in directory, loading them into # dictionaries as dictionary keyed on group name. if len(file_names) == 0 and not ignore_errors:'[-] no secrets descriptions files found') for fname in file_names: group = fname.stem if '.' in group: raise RuntimeError( f"[-] group name cannot include '.': '{group}'") descriptions = self.read_descriptions(group=group) if descriptions is not None: self._descriptions[group] = descriptions # Dynamically create maps keyed on variable name # for simpler lookups. (See the get_prompt() method # for an example.) # {'Prompt': 'Google OAuth2 username', 'Type': 'string', 'Variable': 'google_oauth_username'} # noqa for d in descriptions: # TODO(dittrich): # noqa self.Group[d['Variable']] = group # type: ignore pylint: disable=no-member # noqa for k, v in d.items(): try: # Add to existing map getattr(self, k)[d['Variable']] = v except AttributeError: raise RuntimeError( f"[-] '{k}' is not a valid attribute") else: raise RuntimeError( f"[-] descriptions for group '{group}' is empty")
[docs] def descriptions(self): return self._descriptions
[docs] def get_secret_type(self, variable): """Get the Type of variable from set of secrets descriptions""" for g in self._descriptions.keys(): i = find( self._descriptions[g], 'Variable', variable ) if i is not None: try: return self._descriptions[g][i]['Type'] except KeyError: return None return None
[docs] def get_options(self, secret): """Get the options for setting the secret""" return self.Options.get(secret, '*') # type: ignore
[docs] def get_help(self, secret): """Get the help documentation URL for the secret""" return self.Help.get(secret, '*') # type: ignore
[docs] def get_prompt(self, secret): """Get the prompt for the secret""" return self.Prompt.get(secret, secret) # type: ignore
# TODO(dittrich): Not very DRY (but no time now)
[docs] def get_secret_arguments(self, variable): """Get the Arguments of variable from set of secrets descriptions""" for g in self._descriptions.keys(): i = find( self._descriptions[g], 'Variable', variable ) if i is not None: try: return self._descriptions[g][i]['Arguments'] except KeyError: return {} return {}
[docs] def get_items_from_group(self, group): """Get the variables in a secrets description group""" try: return [i['Variable'] for i in self._descriptions[group]] except KeyError: return []
[docs] def is_item_in_group(self, item, group): """Return true or false based on item being in group""" return find( self._descriptions[group], 'Variable', item) is not None
[docs] def get_group(self, item): """Return the group to which an item belongs.""" try: return self.Group[item] # type: ignore except (KeyError, AttributeError): return None
[docs] def get_groups(self): """Get the secrets description groups""" return [g for g in self._descriptions]
# vim: set ts=4 sw=4 tw=0 et :