Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions samcli/local/lambdafn/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
import zipfile

from samcli.commands.exceptions import UserException

LOG = logging.getLogger(__name__)

S_IFLNK = 0xA
Expand Down Expand Up @@ -48,15 +50,27 @@ def _extract(file_info, output_dir, zip_ref):
-------
string
Returns the target path the Zip Entry was extracted to.

Raises
------
ValueError
If the extraction path is not valid
"""

# Handle any regular file/directory entries
if not _is_symlink(file_info):
return zip_ref.extract(file_info, output_dir)

source = zip_ref.read(file_info.filename).decode("utf8")
output_dir = os.path.normpath(output_dir)
link_name = os.path.normpath(os.path.join(output_dir, file_info.filename))

output_dir_abs = os.path.abspath(output_dir)
link_name_abs = os.path.abspath(link_name)

if not link_name_abs.startswith(output_dir_abs + os.sep) and link_name_abs != output_dir_abs:
raise UserException(f"Failed to extract file from the zip file. The '{file_info.filename}' is invalid")

# make leading dirs if needed
leading_dirs = os.path.dirname(link_name)
if not os.path.exists(leading_dirs):
Expand Down Expand Up @@ -85,19 +99,22 @@ def unzip(zip_file_path, output_dir, permission=None):
permission : int
Permission to set in an octal int form
"""

extracted_path = None
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
# For each item in the zip file, extract the file and set permissions if available
for file_info in zip_ref.infolist():
extracted_path = _extract(file_info, output_dir, zip_ref)

# If the extracted_path is a symlink, do not set the permissions. If the target of the symlink does not
# exist, then os.chmod will fail with FileNotFoundError
if not os.path.islink(extracted_path):
_set_permissions(file_info, extracted_path)
_override_permissions(extracted_path, permission)

if not os.path.islink(extracted_path):
try:
extracted_path = _extract(file_info, output_dir, zip_ref)

# If the extracted_path is a symlink, do not set the permissions. If the target of the symlink does not
# exist, then os.chmod will fail with FileNotFoundError
if not os.path.islink(extracted_path):
_set_permissions(file_info, extracted_path)
_override_permissions(extracted_path, permission)
except Exception as ex:
LOG.debug("Failed to extract '%s' from %s: %s", file_info.filename, zip_file_path, ex)

if extracted_path is not None and not os.path.islink(extracted_path):
_override_permissions(output_dir, permission)


Expand Down
Loading