GIF89a;
EcchiShell v1.0
/
/
proc/
self/
root/
usr/
local/
letsencrypt/
1:
acc = display_ops.choose_account(accounts)
elif len(accounts) == 1:
acc = accounts[0]
else: # no account registered yet
if config.email is None and not config.register_unsafely_without_email:
config.email = display_ops.get_email()
def _tos_cb(regr):
if config.tos:
return True
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server at {1}".format(
regr.terms_of_service, config.server))
obj = zope.component.getUtility(interfaces.IDisplay)
return obj.yesno(msg, "Agree", "Cancel",
cli_flag="--agree-tos", force_interactive=True)
try:
acc, acme = client.register(
config, account_storage, tos_cb=_tos_cb)
except errors.MissingCommandlineFlag:
raise
except errors.Error as error:
logger.debug(error, exc_info=True)
raise errors.Error(
"Unable to register an account with ACME server")
config.account = acc.id
return acc, acme
def _init_le_client(config, authenticator, installer):
if authenticator is not None:
# if authenticator was given, then we will need account...
acc, acme = _determine_account(config)
logger.debug("Picked account: %r", acc)
# XXX
#crypto_util.validate_key_csr(acc.key)
else:
acc, acme = None, None
return client.Client(config, acc, authenticator, installer, acme=acme)
def unregister(config, unused_plugins):
"""Deactivate account on server"""
account_storage = account.AccountFileStorage(config)
accounts = account_storage.find_all()
reporter_util = zope.component.getUtility(interfaces.IReporter)
if not accounts:
return "Could not find existing account to deactivate."
yesno = zope.component.getUtility(interfaces.IDisplay).yesno
prompt = ("Are you sure you would like to irrevocably deactivate "
"your account?")
wants_deactivate = yesno(prompt, yes_label='Deactivate', no_label='Abort',
default=True)
if not wants_deactivate:
return "Deactivation aborted."
acc, acme = _determine_account(config)
acme_client = client.Client(config, acc, None, None, acme=acme)
# delete on boulder
acme_client.acme.deactivate_registration(acc.regr)
account_files = account.AccountFileStorage(config)
# delete local account files
account_files.delete(config.account)
reporter_util.add_message("Account deactivated.", reporter_util.MEDIUM_PRIORITY)
def register(config, unused_plugins):
"""Create or modify accounts on the server."""
# Portion of _determine_account logic to see whether accounts already
# exist or not.
account_storage = account.AccountFileStorage(config)
accounts = account_storage.find_all()
reporter_util = zope.component.getUtility(interfaces.IReporter)
add_msg = lambda m: reporter_util.add_message(m, reporter_util.MEDIUM_PRIORITY)
# registering a new account
if not config.update_registration:
if len(accounts) > 0:
# TODO: add a flag to register a duplicate account (this will
# also require extending _determine_account's behavior
# or else extracting the registration code from there)
return ("There is an existing account; registration of a "
"duplicate account with this command is currently "
"unsupported.")
# _determine_account will register an account
_determine_account(config)
return
# --update-registration
if len(accounts) == 0:
return "Could not find an existing account to update."
if config.email is None:
if config.register_unsafely_without_email:
return ("--register-unsafely-without-email provided, however, a "
"new e-mail address must\ncurrently be provided when "
"updating a registration.")
config.email = display_ops.get_email(optional=False)
acc, acme = _determine_account(config)
acme_client = client.Client(config, acc, None, None, acme=acme)
# We rely on an exception to interrupt this process if it didn't work.
acc.regr = acme_client.acme.update_registration(acc.regr.update(
body=acc.regr.body.update(contact=('mailto:' + config.email,))))
account_storage.save_regr(acc, acme_client.acme)
eff.handle_subscription(config)
add_msg("Your e-mail address was updated to {0}.".format(config.email))
def _install_cert(config, le_client, domains, lineage=None):
path_provider = lineage if lineage else config
assert path_provider.cert_path is not None
le_client.deploy_certificate(domains, path_provider.key_path,
path_provider.cert_path, path_provider.chain_path, path_provider.fullchain_path)
le_client.enhance_config(domains, path_provider.chain_path)
def install(config, plugins):
"""Install a previously obtained cert in a server."""
# XXX: Update for renewer/RenewableCert
# FIXME: be consistent about whether errors are raised or returned from
# this function ...
try:
installer, _ = plug_sel.choose_configurator_plugins(config, plugins, "install")
except errors.PluginSelectionError as e:
return str(e)
domains, _ = _find_domains_or_certname(config, installer)
le_client = _init_le_client(config, authenticator=None, installer=installer)
_install_cert(config, le_client, domains)
def plugins_cmd(config, plugins): # TODO: Use IDisplay rather than print
"""List server software plugins."""
logger.debug("Expected interfaces: %s", config.ifaces)
ifaces = [] if config.ifaces is None else config.ifaces
filtered = plugins.visible().ifaces(ifaces)
logger.debug("Filtered plugins: %r", filtered)
if not config.init and not config.prepare:
print(str(filtered))
return
filtered.init(config)
verified = filtered.verify(ifaces)
logger.debug("Verified plugins: %r", verified)
if not config.prepare:
print(str(verified))
return
verified.prepare()
available = verified.available()
logger.debug("Prepared plugins: %s", available)
print(str(available))
def rollback(config, plugins):
"""Rollback server configuration changes made during install."""
client.rollback(config.installer, config.checkpoints, config, plugins)
def config_changes(config, unused_plugins):
"""Show changes made to server config during installation
View checkpoints and associated configuration changes.
"""
client.view_config_changes(config, num=config.num)
def update_symlinks(config, unused_plugins):
"""Update the certificate file family symlinks
Use the information in the config file to make symlinks point to
the correct archive directory.
"""
cert_manager.update_live_symlinks(config)
def rename(config, unused_plugins):
"""Rename a certificate
Use the information in the config file to rename an existing
lineage.
"""
cert_manager.rename_lineage(config)
def delete(config, unused_plugins):
"""Delete a certificate
Use the information in the config file to delete an existing
lineage.
"""
cert_manager.delete(config)
def certificates(config, unused_plugins):
"""Display information about certs configured with Certbot
"""
cert_manager.certificates(config)
def revoke(config, unused_plugins): # TODO: coop with renewal config
"""Revoke a previously obtained certificate."""
# For user-agent construction
config.installer = config.authenticator = "None"
if config.key_path is not None: # revocation by cert key
logger.debug("Revoking %s using cert key %s",
config.cert_path[0], config.key_path[0])
key = jose.JWK.load(config.key_path[1])
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path[0])
acc, _ = _determine_account(config)
key = acc.key
acme = client.acme_from_config_key(config, key)
cert = crypto_util.pyopenssl_load_certificate(config.cert_path[1])[0]
logger.debug("Reason code for revocation: %s", config.reason)
try:
acme.revoke(jose.ComparableX509(cert), config.reason)
except acme_errors.ClientError as e:
return e.message
display_ops.success_revocation(config.cert_path[0])
def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
"""Obtain a certificate and install."""
# TODO: Make run as close to auth + install as possible
# Possible difficulties: config.csr was hacked into auth
try:
installer, authenticator = plug_sel.choose_configurator_plugins(config, plugins, "run")
except errors.PluginSelectionError as e:
return e.message
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(config, authenticator, installer)
domains, certname = _find_domains_or_certname(config, installer)
should_get_cert, lineage = _find_cert(config, domains, certname)
new_lineage = lineage
if should_get_cert:
new_lineage = _get_and_save_cert(le_client, config, domains,
certname, lineage)
cert_path = new_lineage.cert_path if new_lineage else None
fullchain_path = new_lineage.fullchain_path if new_lineage else None
_report_new_cert(config, cert_path, fullchain_path)
_install_cert(config, le_client, domains, new_lineage)
if lineage is None or not should_get_cert:
display_ops.success_installation(domains)
else:
display_ops.success_renewal(domains)
_suggest_donation_if_appropriate(config)
def _csr_get_and_save_cert(config, le_client):
"""Obtain a cert using a user-supplied CSR
This works differently in the CSR case (for now) because we don't
have the privkey, and therefore can't construct the files for a lineage.
So we just save the cert & chain to disk :/
"""
csr, typ = config.actual_csr
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ)
if config.dry_run:
logger.debug(
"Dry run: skipping saving certificate to %s", config.cert_path)
return None, None
cert_path, _, fullchain_path = le_client.save_certificate(
certr, chain, config.cert_path, config.chain_path, config.fullchain_path)
return cert_path, fullchain_path
def renew_cert(config, plugins, lineage):
"""Renew & save an existing cert. Do not install it."""
try:
# installers are used in auth mode to determine domain names
installer, auth = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
except errors.PluginSelectionError as e:
logger.info("Could not choose appropriate plugin: %s", e)
raise
le_client = _init_le_client(config, auth, installer)
_get_and_save_cert(le_client, config, lineage=lineage)
notify = zope.component.getUtility(interfaces.IDisplay).notification
if installer is None:
notify("new certificate deployed without reload, fullchain is {0}".format(
lineage.fullchain), pause=False)
else:
# In case of a renewal, reload server to pick up new certificate.
# In principle we could have a configuration option to inhibit this
# from happening.
installer.restart()
notify("new certificate deployed with reload of {0} server; fullchain is {1}".format(
config.installer, lineage.fullchain), pause=False)
def certonly(config, plugins):
"""Authenticate & obtain cert, but do not install it.
This implements the 'certonly' subcommand."""
# SETUP: Select plugins and construct a client instance
try:
# installers are used in auth mode to determine domain names
installer, auth = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
except errors.PluginSelectionError as e:
logger.info("Could not choose appropriate plugin: %s", e)
raise
le_client = _init_le_client(config, auth, installer)
if config.csr:
cert_path, fullchain_path = _csr_get_and_save_cert(config, le_client)
_report_new_cert(config, cert_path, fullchain_path)
_suggest_donation_if_appropriate(config)
return
domains, certname = _find_domains_or_certname(config, installer)
should_get_cert, lineage = _find_cert(config, domains, certname)
if not should_get_cert:
notify = zope.component.getUtility(interfaces.IDisplay).notification
notify("Certificate not yet due for renewal; no action taken.", pause=False)
return
lineage = _get_and_save_cert(le_client, config, domains, certname, lineage)
cert_path = lineage.cert_path if lineage else None
fullchain_path = lineage.fullchain_path if lineage else None
_report_new_cert(config, cert_path, fullchain_path)
_suggest_donation_if_appropriate(config)
def renew(config, unused_plugins):
"""Renew previously-obtained certificates."""
try:
renewal.handle_renewal_request(config)
finally:
hooks.run_saved_post_hooks()
def setup_log_file_handler(config, logfile, fmt):
"""Setup file debug logging."""
log_file_path = os.path.join(config.logs_dir, logfile)
try:
handler = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2 ** 20, backupCount=1000)
except IOError as error:
raise errors.Error(_PERM_ERR_FMT.format(error))
# rotate on each invocation, rollover only possible when maxBytes
# is nonzero and backupCount is nonzero, so we set maxBytes as big
# as possible not to overrun in single CLI invocation (1MB).
handler.doRollover() # TODO: creates empty letsencrypt.log.1 file
handler.setLevel(logging.DEBUG)
handler_formatter = logging.Formatter(fmt=fmt)
handler_formatter.converter = time.gmtime # don't use localtime
handler.setFormatter(handler_formatter)
return handler, log_file_path
def _cli_log_handler(level, fmt):
handler = colored_logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt))
handler.setLevel(level)
return handler
def setup_logging(config):
"""Sets up logging to logfiles and the terminal.
:param certbot.interface.IConfig config: Configuration object
"""
cli_fmt = "%(message)s"
file_fmt = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
logfile = "letsencrypt.log"
if config.quiet:
level = constants.QUIET_LOGGING_LEVEL
else:
level = -config.verbose_count * 10
file_handler, log_file_path = setup_log_file_handler(
config, logfile=logfile, fmt=file_fmt)
cli_handler = _cli_log_handler(level, cli_fmt)
# TODO: use fileConfig?
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # send all records to handlers
root_logger.addHandler(cli_handler)
root_logger.addHandler(file_handler)
logger.debug("Root logging level set at %d", level)
logger.info("Saving debug log to %s", log_file_path)
def _handle_exception(exc_type, exc_value, trace, config):
"""Logs exceptions and reports them to the user.
Config is used to determine how to display exceptions to the user. In
general, if config.debug is True, then the full exception and traceback is
shown to the user, otherwise it is suppressed. If config itself is None,
then the traceback and exception is attempted to be written to a logfile.
If this is successful, the traceback is suppressed, otherwise it is shown
to the user. sys.exit is always called with a nonzero status.
"""
tb_str = "".join(traceback.format_exception(exc_type, exc_value, trace))
logger.debug("Exiting abnormally:%s%s", os.linesep, tb_str)
if issubclass(exc_type, Exception) and (config is None or not config.debug):
if config is None:
logfile = "certbot.log"
try:
with open(logfile, "w") as logfd:
traceback.print_exception(
exc_type, exc_value, trace, file=logfd)
assert "--debug" not in sys.argv # config is None if this explodes
except: # pylint: disable=bare-except
sys.exit(tb_str)
if "--debug" in sys.argv:
sys.exit(tb_str)
if issubclass(exc_type, errors.Error):
sys.exit(exc_value)
else:
# Here we're passing a client or ACME error out to the client at the shell
# Tell the user a bit about what happened, without overwhelming
# them with a full traceback
err = traceback.format_exception_only(exc_type, exc_value)[0]
# Typical error from the ACME module:
# acme.messages.Error: urn:ietf:params:acme:error:malformed :: The
# request message was malformed :: Error creating new registration
# :: Validation of contact mailto:none@longrandomstring.biz failed:
# Server failure at resolver
if (messages.is_acme_error(err) and ":: " in err and
config.verbose_count <= cli.flag_default("verbose_count")):
# prune ACME error code, we have a human description
_code, _sep, err = err.partition(":: ")
msg = "An unexpected error occurred:\n" + err + "Please see the "
if config is None:
msg += "logfile '{0}' for more details.".format(logfile)
else:
msg += "logfiles in {0} for more details.".format(config.logs_dir)
sys.exit(msg)
else:
sys.exit(tb_str)
def make_or_verify_core_dir(directory, mode, uid, strict):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:param bool strict: require directory to be owned by current user
:raises .errors.Error: if the directory cannot be made or verified
"""
try:
util.make_or_verify_dir(directory, mode, uid, strict)
except OSError as error:
raise errors.Error(_PERM_ERR_FMT.format(error))
def make_or_verify_needed_dirs(config):
"""Create or verify existence of config, work, or logs directories"""
make_or_verify_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
make_or_verify_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
# TODO: logs might contain sensitive data such as contents of the
# private key! #525
make_or_verify_core_dir(config.logs_dir, 0o700,
os.geteuid(), config.strict_permissions)
def set_displayer(config):
"""Set the displayer"""
if config.quiet:
config.noninteractive_mode = True
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w"))
elif config.noninteractive_mode:
displayer = display_util.NoninteractiveDisplay(sys.stdout)
else:
displayer = display_util.FileDisplay(sys.stdout,
config.force_interactive)
zope.component.provideUtility(displayer)
def _post_logging_setup(config, plugins, cli_args):
"""Perform any setup or configuration tasks that require a logger."""
# This needs logging, but would otherwise be in HelpfulArgumentParser
if config.validate_hooks:
hooks.validate_hooks(config)
cli.possible_deprecation_warning(config)
logger.debug("certbot version: %s", certbot.__version__)
# do not log `config`, as it contains sensitive data (e.g. revoke --key)!
logger.debug("Arguments: %r", cli_args)
logger.debug("Discovered plugins: %r", plugins)
def main(cli_args=sys.argv[1:]):
"""Command line argument parsing and main script execution."""
sys.excepthook = functools.partial(_handle_exception, config=None)
plugins = plugins_disco.PluginsRegistry.find_all()
# note: arg parser internally handles --help (and exits afterwards)
args = cli.prepare_and_parse_args(plugins, cli_args)
config = configuration.NamespaceConfig(args)
zope.component.provideUtility(config)
make_or_verify_needed_dirs(config)
# Setup logging ASAP, otherwise "No handlers could be found for
# logger ..." TODO: this should be done before plugins discovery
setup_logging(config)
_post_logging_setup(config, plugins, cli_args)
sys.excepthook = functools.partial(_handle_exception, config=config)
set_displayer(config)
# Reporter
report = reporter.Reporter(config)
zope.component.provideUtility(report)
atexit.register(report.atexit_print_messages)
return config.func(config, plugins)
if __name__ == "__main__":
err_string = main()
if err_string:
logger.warning("Exiting with message %s", err_string)
sys.exit(err_string) # pragma: no cover