-
Notifications
You must be signed in to change notification settings - Fork 2
/
generate_static_site.py
174 lines (149 loc) · 6.11 KB
/
generate_static_site.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/python3.6
"""
Generates static site from CodeCommit source and deploys to production S3 Bucket
Many thanks to MMusket and Alestic from which i adapted their code.
https://github.com/mmusket/s3-hosting-guide
https://github.com/alestic/aws-lambda-codepipeline-site-generator-hugo
"""
from __future__ import print_function
import os
import time
import zipfile
import gzip
import tempfile
import shutil
import subprocess
import traceback
from mimetypes import MimeTypes
from multiprocessing.pool import ThreadPool
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
LAMBDA_TASK_ROOT = os.environ.get('LAMBDA_TASK_ROOT', os.path.dirname(os.path.abspath(__file__)))
CURR_BIN_DIR = os.path.join(LAMBDA_TASK_ROOT, 'bin')
### In order to get permissions right, we have to copy them to /tmp
BIN_DIR = '/tmp/bin'
DONOTZIP = ['.jpg', '.png', '.ttf', '.woff', '.woff2', '.gif']
CP = boto3.client('codepipeline')
S3 = boto3.client('s3', config=Config(signature_version='s3v4'))
DEV = False
def setup(event):
""" Setup all properties"""
job_id = event['CodePipeline.job']['id']
job_data = event['CodePipeline.job']['data']
input_artifact = job_data['inputArtifacts'][0]
from_bucket = input_artifact['location']['s3Location']['bucketName']
from_key = input_artifact['location']['s3Location']['objectKey']
# environment variable passed along by CloudFormation template
to_bucket = os.environ['SiteBucket']
return (job_id, from_bucket, from_key, to_bucket)
def download_source(from_bucket, from_key, source_dir):
""" Download source code to be generated """
with tempfile.NamedTemporaryFile() as tmp_file:
S3.download_file(from_bucket, from_key, tmp_file.name)
with zipfile.ZipFile(tmp_file.name, 'r') as zipobj:
zipobj.extractall(source_dir)
def upload_site(site_dir, to_bucket):
"""
Upload generated code to S3 site bucket, clean existing data on copy
Create gzip from all files, except images, then copy to site bucket
"""
staging_dir = '/tmp/staging/'
[zip_file(x, staging_dir + x) if is_zip_file(x) else copy_file(x, staging_dir + x) for x in get_files(site_dir)]
pool = ThreadPool(processes=5)
pool.map(lambda x: upload_file(to_bucket, x, staging_dir, site_dir), get_files(staging_dir))
def generate_static_site(source_dir, site_dir):
"""Generate static site using hugo."""
_init_bin('hugo')
command = [os.path.join(BIN_DIR, 'hugo'), "--source=" + source_dir, "--destination=" + site_dir]
print(command)
try:
print(subprocess.check_output(command, shell=False, stderr=subprocess.STDOUT))
except subprocess.CalledProcessError as error:
print("ERROR return code: ", error.returncode)
print("ERROR output: ", error.output)
raise
def _init_bin(executable_name):
start = time.clock()
if not os.path.exists(BIN_DIR):
print("Creating bin folder")
os.makedirs(BIN_DIR)
print("Copying binaries for "+executable_name+" in /tmp/bin")
currfile = os.path.join(CURR_BIN_DIR, executable_name)
newfile = os.path.join(BIN_DIR, executable_name)
shutil.copy2(currfile, newfile)
print("Giving new binaries permissions for lambda")
os.chmod(newfile, 0o775)
elapsed = (time.clock() - start)
print(executable_name+" ready in "+str(elapsed)+'s.')
def get_files(base_folder):
""" Returns an array containing all the filepaths """
file_paths = []
# os.walk will yield 3 parameter tuple
for root, directory, files in os.walk(base_folder):
for filename in files:
filepath = os.path.join(root, filename)
file_paths.append(filepath)
return file_paths
def zip_file(input, output):
""" GZip's file """
if DEV:
print('Zipping ' + input)
dirname = os.path.dirname(output)
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(input, 'rb') as f_in, gzip.open(output, 'wb') as f_out:
f_out.writelines(f_in)
def copy_file(input, output):
""" Copy to staging directory before we copy to the dest bucket """
if DEV:
print('Copying ' + input)
dirname = os.path.dirname(output)
if not os.path.exists(dirname):
os.makedirs(dirname)
shutil.copyfile(input, output)
def is_zip_file(file_name):
""" doc string """
extension = os.path.splitext(file_name)[1]
if extension in DONOTZIP:
return False
return True
def upload_file(bucket_name, file_path, staging_dir, site_dir):
""" Upload file by file, before upload it will set the encoding for the file """
destname = file_path.replace(staging_dir, "/")
destname = destname.replace(site_dir + "/", "")
if DEV:
print("Uploading file " + file_path + ' to ' + destname)
try:
data = open(file_path, 'rb')
ftype, encoding = MimeTypes().guess_type(file_path)
con_type = ftype if ftype is not None else encoding if encoding is not None else 'text/plain'
enc_type = 'gzip' if is_zip_file(file_path) else ''
S3.put_object(Bucket=bucket_name, Key=destname, Body=data,
ContentEncoding=enc_type, ContentType=con_type, ACL='public-read')
except ClientError as err:
print("Failed to upload artefact to S3.\n" + str(err))
return False
except IOError as err:
print("Failed to access artefact in this directory.\n" + str(err))
return False
return True
def handler(event, context):
""" Program event flow"""
try:
(job_id, from_bucket, from_key, to_bucket) = setup(event)
source_dir = tempfile.mkdtemp()
site_dir = tempfile.mkdtemp()
download_source(from_bucket, from_key, source_dir)
generate_static_site(source_dir, site_dir)
upload_site(site_dir, to_bucket)
CP.put_job_success_result(jobId=job_id)
except Exception as exception:
print("ERROR: " + repr(exception))
traceback.print_exc()
CP.put_job_failure_result(
jobId=job_id, failureDetails={'message': exception, 'type': 'JobFailed'})
finally:
shutil.rmtree(source_dir)
shutil.rmtree(site_dir)
return "complete"