aws-cert/aws-cert

249 lines
7.1 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# The "main" gem is a framework for command line applications
require "main"
# load configuration from .env file
require 'dotenv/load'
# Import only the relevant AWS SDK
require 'aws-sdk-acm'
require 'aws-sdk-elasticloadbalancingv2'
# Useful for debugging
# require "pry"
class ACM
attr_reader :client
def initialize(client)
@client = client
end
# return the ARN of a certificate from its domain name
def certificate_arn_by_domain(domain_name)
cert_summary = client.list_certificates.certificate_summary_list.detect { |cert_summary|
cert_summary.domain_name == domain_name
}
cert_summary.certificate_arn
end
# return the certificate_summary_list object from AWS sdk or yields if a block is given
def list_certificates(&block)
response = client.list_certificates
if block_given?
response.certificate_summary_list.each do |cert_summary|
yield(cert_summary)
end
else
response.certificate_summary_list
end
end
# return the certificate object from AWS sdk or yields if a block is given
def describe_certificate(certificate_arn_or_domain_name, &block)
certificate_arn = if acm_arn?(certificate_arn_or_domain_name)
certificate_arn_or_domain_name
else
certificate_arn_by_domain(certificate_arn_or_domain_name)
end
response = client.describe_certificate(certificate_arn: certificate_arn)
certificate = response.certificate
if block_given?
yield(certificate)
else
certificate
end
end
# return the certificate ARN of the created certificate
def request_certificate(domain_name)
response = client.request_certificate({
domain_name: domain_name,
validation_method: "DNS",
# idempotency_token: "IdempotencyToken",
})
response.certificate_arn
end
# return true/false if the input looks like an ACM ARN
def acm_arn?(input)
/\Aarn:aws:acm:/.match(input)
end
end
# print the certificate status (used in a yielded block)
def print_certificate_status(certificate)
puts "Domain name: #{certificate.domain_name}"
puts " ARN: #{certificate.certificate_arn}"
puts " Status: #{certificate.status}"
puts " Created at: #{certificate.created_at}"
puts " Not before: #{certificate.not_before}"
puts " Not after: #{certificate.not_after}"
puts " Issuer: #{certificate.issuer}"
puts " Renewable eligibility: #{certificate.renewal_eligibility}"
if certificate.domain_validation_options
certificate.domain_validation_options.each do |validation_option|
puts " Validation option: #{validation_option.validation_method}"
puts " Status: #{validation_option.validation_status}"
case validation_option.validation_method
when "DNS"
if validation_option.resource_record
puts " Record name: #{validation_option.resource_record.name}"
puts " Record type: #{validation_option.resource_record.type}"
puts " Record value: #{validation_option.resource_record.value}"
else
puts " Record: _unavailable_"
end
when "EMAIL"
puts " Emails : #{validation_option.validation_emails}"
end
end
else
puts " Validation option: _unavailable_"
end
end
# print the certificate summary (used in a yielded block)
def print_certificate_summary(cert_summary)
puts "Domain name: #{cert_summary.domain_name}"
puts " ARN: #{cert_summary.certificate_arn}"
end
class ELBv2
attr_reader :client
def initialize(client)
@client = client
end
# return a boolean indicating if the cert has been attached to the listener
def add_listener_certificate(listener_arn, certificate_arn)
response = client.add_listener_certificates({
listener_arn: listener_arn,
certificates: [
{
certificate_arn: certificate_arn,
# is_default: false,
},
],
})
response.certificates.first.certificate_arn == certificate_arn
end
# return the load_balancer object from AWS sdk or yields if a block is given
def describe_load_balancer(load_balancer_arn)
response = client.describe_load_balancers({
load_balancer_arns: [
load_balancer_arn
],
})
load_balancer = response.load_balancers.first
if block_given?
yield(load_balancer)
else
load_balancer
end
end
end
Main {
mode 'list' do
def run()
acm = ACM.new(acm_client)
acm.list_certificates do |cert_summary|
print_certificate_summary(cert_summary)
end
rescue Aws::Errors::ServiceError => ex
exit_status exit_failure
puts ex.message
end
end
mode 'status' do
argument('domain') { required }
def run()
domain_name = params[:domain].value
acm = ACM.new(acm_client)
acm.describe_certificate(domain_name) do |certificate|
exit_status exit_failure unless certificate.status == "ISSUED"
print_certificate_status(certificate)
end
rescue Aws::Errors::ServiceError => ex
puts "#{ex.class.name}: #{ex.message}"
exit_failure!
end
end
mode 'create' do
argument('domain') { required }
def run()
domain_name = params[:domain].value
acm = ACM.new(acm_client)
certificate_arn = acm.request_certificate(domain_name)
acm.describe_certificate(certificate_arn) do |certificate|
exit_status exit_failure unless certificate.status == "ISSUED"
print_certificate_status(certificate)
end
rescue Aws::Errors::ServiceError => ex
puts "#{ex.class.name}: #{ex.message}"
exit_failure!
end
end
mode 'enable' do
argument('domain') { required }
environment('ELB_ARN'){ required }
environment('ELB_LISTENER_ARN'){ required }
def run()
domain_name = params['domain'].value
elb_arn = params['ELB_ARN'].value
elb_listener_arn = params['ELB_LISTENER_ARN'].value
acm = ACM.new(acm_client)
certificate_arn = acm.certificate_arn_by_domain(domain_name)
certificate = acm.describe_certificate(certificate_arn)
if certificate.status == "ISSUED"
elbv2 = ELBv2.new(elbv2_client)
elbv2.add_listener_certificate(elb_listener_arn, certificate_arn)
elbv2.describe_load_balancer(elb_arn) do |load_balancer|
puts "Certificate has been added to load-balancer, change DNS configuration :"
puts " Record name: #{domain_name}."
puts " Record type: CNAME"
puts " Record value: #{load_balancer.dns_name}."
end
else
exit_status exit_failure
puts "Certificate for '#{domain_name}' is not available for ELB (status: #{certificate.status})"
end
rescue Aws::Errors::ServiceError => ex
puts "#{ex.class.name}: #{ex.message}"
exit_failure!
end
end
def acm_client
args = {}
args[:profile] = ENV['AWS_PROFILE'] if ENV['AWS_PROFILE']
# args[:logger] = Logger.new(STDOUT)
Aws::ACM::Client.new(args)
end
def elbv2_client
args = {}
args[:profile] = ENV['AWS_PROFILE'] if ENV['AWS_PROFILE']
Aws::ElasticLoadBalancingV2::Client.new(args)
end
}