import fnmatch
import os
import glob
import logging
import subprocess
import tempfile
import atexit
import shutil
import time
import wrapt
import requests
from abc import ABCMeta, abstractmethod
from six import add_metaclass, PY3, string_types
from io import BytesIO, TextIOWrapper
from collections import namedtuple
from contextlib import contextmanager
from google.cloud import storage
DataFile = namedtuple('DataFile', ['path', 'handle'])
@wrapt.decorator
def _retry_with_backoff(wrapped, instance, args, kwargs):
trial = 0
while True:
wait = 2**(trial+2) # 4s up to 128s
try:
return wrapped(*args, **kwargs)
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
if trial == 6:
raise
logging.info('{} failed to connect, retrying after {}s'.format(wrapped.__name__, wait))
trial += 1
time.sleep(wait)
[docs]@add_metaclass(ABCMeta)
class FileSystem(object):
""" The required interface for any filesystem implementation
See GCSFileSystem for a full implementation. This FileSystem is intended
to be extendable to support cloud file systems, encryption strategies, etc...
"""
[docs] @abstractmethod
def ls(self, path):
""" List files correspond to path, including glob wildcards
Parameters
----------
path : str
The path to the file or directory to list; supports wildcards
"""
pass
[docs] @abstractmethod
def access(self, paths):
""" Access multiple paths as file-like objects
This allows for optimization like parallel downloads
Parameters
----------
paths: list of str
The paths of the files to access
Returns
-------
files: list of DataFile
A list of datafile instances, one for each input path
"""
pass
[docs] @abstractmethod
def store(self, bucket, files):
""" Store multiple data objects
This allows for optimizations when storing several files
Parameters
----------
bucket : str
The GCS bucket to use to store the files
files : list of str
The file names to store
Returns
-------
datafiles : contextmanager
A contextmanager that will yield datafiles and place them
on the filesystem when finished
"""
pass
[docs]class GCSFileSystem(FileSystem):
""" File system interface that supports both local and GCS files
This implementation uses subprocess and gsutil, which has excellent performance.
However this can lead to problems in very multi-threaded applications and might not be
as portable. For a python native implementation use GCSNativeFileSystem
"""
GCS = 'gs://'
def __init__(self, parallel=True, quiet=True):
flags = []
if parallel:
flags.append('-m')
if quiet:
flags.append('-q')
self.gcscp = ['gsutil'] + flags + ['cp']
[docs] def local(self, path):
""" Check if the path is available as a local file
"""
return not path.startswith(self.GCS)
[docs] def ls(self, path):
""" List files correspond to path, including glob wildcards
Parameters
----------
path : str
The path to the file or directory to list; supports wildcards
"""
logging.info('Globbing file content in {}'.format(path))
if not self.local(path):
with open(os.devnull, 'w') as DEVNULL:
p = subprocess.Popen(
['gsutil', 'ls', path],
stdout=subprocess.PIPE,
stderr=DEVNULL,
universal_newlines=True,
)
stdout = p.communicate()[0]
output = [line for line in stdout.split('\n') if line and line[-1] != ':']
elif '**' in path:
# Manual recursive glob, since in 2.X glob doesn't have recursive support
path = path.rstrip('*')
output = []
for root, subdirs, files in os.walk(path):
for fname in files:
output.append(os.path.join(root, fname))
elif os.path.isdir(path):
output = [os.path.join(path, f) for f in os.listdir(path)]
else:
output = glob.glob(path)
return sorted(p.rstrip('/') for p in output)
[docs] def rm(self, paths, recursive=False):
""" Remove the files at paths
Parameters
----------
paths : list of str
The paths to remove
recursive : bool, default False
If true, recursively remove any directories
"""
if isinstance(paths, string_types):
paths = [paths]
if any(not self.local(p) for p in paths):
# at least one location is on GCS
cmd = ['gsutil', '-m', 'rm']
else:
cmd = ['rm']
if recursive:
cmd.append('-r')
CHUNK_SIZE = 1000
paths_chunks = [paths[x:x+CHUNK_SIZE] for x in range(0, len(paths), CHUNK_SIZE)]
for paths in paths_chunks:
subprocess.check_call(cmd + paths)
[docs] def cp(self, sources, dest, recursive=False):
""" Copy the files in sources to dest
Parameters
----------
sources : list of str
The list of paths to copy
dest : str
The destination for the copy of source(s)
recursive : bool
If true, recursively copy any directories
"""
if isinstance(sources, string_types):
sources = [sources]
summary = ', '.join(sources)
logging.info('Copying {} to {}...'.format(summary, dest))
if any(self.GCS in x for x in sources + [dest]):
# at least one location is on GCS
cmd = self.gcscp
else:
cmd = ['cp']
if recursive:
cmd.append('-r')
CHUNK_SIZE = 1000
sources_chunks = [sources[x:x+CHUNK_SIZE] for x in range(0, len(sources), CHUNK_SIZE)]
for sources in sources_chunks:
subprocess.check_call(cmd + sources + [dest])
[docs] @contextmanager
def open(self, path, mode='rb'):
""" Access path as a file-like object
Parameters
----------
path: str
The path of the file to access
mode: str
The file mode for the opened file
Returns
-------
file: file
A python file opened to the provided path (uses a local temporary copy that is removed)
"""
with tempfile.NamedTemporaryFile() as nf:
if mode.startswith('r'):
self.cp(path, nf.name)
nf.seek(0)
with open(nf.name, mode) as f:
yield f
nf.seek(0)
if mode.startswith('w'):
self.cp(nf.name, path)
[docs] def access(self, paths):
""" Access multiple paths as file-like objects
This allows for optimization like parallel downloads
Parameters
----------
paths: list of str
The paths of the files to access
Returns
-------
files: list of DataFile
A list of datafile instances, one for each input path
"""
# Move the files into a tempdir from GCS
tmpdir = _session_tempdir()
self.cp(paths, tmpdir, recursive=True)
# Then get file handles for each
datafiles = []
for path in paths:
local = os.path.join(tmpdir, os.path.basename(path))
datafiles.append(DataFile(path, open(local, 'rb')))
return datafiles
[docs] @contextmanager
def store(self, bucket, files):
""" Create file stores that will be written to the filesystem on close
This allows for optimizations when storing several files
Parameters
----------
bucket : str
The path of the bucket (on GCS) or folder (local) to store the data in
files : list of str
The filenames to create
Returns
-------
datafiles : contextmanager
A context manager that yields datafiles and when the context is closed
they are written to GCS
Usage
-----
>>> with filesystem.store('gs://bucket/sub/', ['ex1.txt', 'ex2.txt']) as datafiles:
>>> datafiles[0].handle.write('example 1')
>>> datafiles[1].handle.write('example 2')
"""
# Make local files in a tempdir that serve as the file handles
tmpdir = _session_tempdir()
datafiles = []
local_files = []
for f in files:
local = os.path.join(tmpdir, f)
local_files.append(local)
datafiles.append(DataFile(os.path.join(bucket, f), open(local, 'wb')))
yield datafiles
for d in datafiles:
d.handle.close()
if self.local(bucket) and not os.path.exists(bucket):
os.makedirs(bucket)
self.cp(local_files, os.path.join(bucket, ''), recursive=True)
[docs]class GCSNativeFileSystem(GCSFileSystem):
""" File system interface that supports GCS and local files
This uses the native python cloud storage library for read and write, rather than gsutil.
The performance is significantly slower when doing any operations over several files (especially
copy), but is thread-safe for applications which are already parallelized. It stores the files
entirely in memory rather than using tempfiles.
"""
def __init__(self, *args, **kwargs):
self._client = None
super(GCSNativeFileSystem, self).__init__(*args, **kwargs)
[docs] def client(self):
# Load client only when needed, so that this can be used for local paths without connecting
if self._client is None:
self._client = storage.Client()
return self._client
[docs] def ls(self, path):
""" List all files at the specified path, supports globbing
"""
logging.info('Globbing file content in {}'.format(path))
# use GCSFileSystem's implementation for local paths
if self.local(path):
return super(GCSNativeFileSystem, self).ls(path)
# find all names that start with the prefix for this path
bucket, path = self._split(path)
prefix = self._prefix(path)
if prefix == path:
# no pattern matching
iterator = self._list_blobs(bucket, prefix=prefix, delimiter='/')
names = [b.name for b in iterator]
names += iterator.prefixes
# if we ran ls('gs://bucket/dir') we need to rerun with '/' to get the content
if names == [os.path.join(path, '')]:
return self.ls(os.path.join('gs://' + bucket, path, ''))
else:
# we have a pattern to match
names = [b.name for b in self._list_blobs(bucket, prefix=prefix)]
# for recursive glob, do not attempt to match folders, only files
if '**' in path:
names = fnmatch.filter(names, path)
else:
# make sure we can match folders in addition to blobs
candidates = set(names) | set(self._list_single(prefix, names))
names = fnmatch.filter(candidates, path)
# one more filter because fnmatch recurses single *
names = [name for name in names if name.count('/') == path.count('/')]
paths = ['gs://{}/{}'.format(bucket, name) for name in names]
return sorted(p.rstrip('/') for p in paths)
[docs] def is_dir(self, path):
# Check if a path is a directory, locally or on GCS
if self.local(path):
return os.path.isdir(path)
elif path.endswith('/'):
return True
else:
return self.ls(path) != [path]
[docs] def copy_single(self, source, dest):
local_source = self.local(source)
local_dest = self.local(dest)
if local_source and local_dest:
shutil.copy(source, dest)
if not local_source and local_dest:
if os.path.isdir(dest):
dest = os.path.join(dest, os.path.basename(source))
self._blob(source).download_to_filename(dest)
if local_source and not local_dest:
if dest.endswith('/'):
dest = os.path.join(dest, os.path.basename(source))
self._blob(dest).upload_from_filename(source)
if not local_source and not local_dest:
if dest.endswith('/'):
dest = os.path.join(dest, os.path.basename(source))
self._transfer(source, dest)
[docs] def cp(self, sources, dest, recursive=False):
""" Copy the files in sources (recursively) to dest
Parameters
----------
sources : list of str
The list of paths to copy, which can be directories
dest : str
The destination for the copy of source(s)
recursive : bool, default False
If true, recursively copy directories
"""
if isinstance(sources, string_types):
sources = [sources]
for source in sources:
if recursive and self.is_dir(source):
# Note: if source ends with a '/', this copies the content into dest
# and if source does not, this copies the whole directory into dest
# this is the same behavior as copy
subsource = [s.rstrip('/') for s in self.ls(source)]
subdest = os.path.join(dest, os.path.basename(source), '')
if self.local(subdest) and not os.path.exists(subdest):
os.makedirs(subdest)
self.cp(subsource, subdest, recursive=True)
else:
self.copy_single(source, dest)
[docs] def rm_single(self, path):
if self.local(path):
os.remove(path)
else:
self._blob(path).delete()
[docs] def rm(self, paths, recursive=False):
""" Remove the files at paths
Parameters
----------
paths : list of str
The paths to remove
recursive : bool, default False
If true, recursively remove any directories
"""
if isinstance(paths, string_types):
paths = [paths]
for path in paths:
if recursive and self.is_dir(path) and not self.local(path):
self.rm(self.ls(path), recursive=True)
elif recursive and self.is_dir(path):
shutil.rmtree(path)
else:
self.rm_single(path)
[docs] @contextmanager
def open(self, path, mode='rb'):
""" Access paths as a file-like object
Parameters
----------
path: str
The path of the file to access
mode: str
The file mode for the opened file
Returns
-------
file: BytesIO
A BytesIO handle for the specified path, works like a file object
"""
datafile = DataFile(path, BytesIO())
if mode.startswith('r'):
self._read(datafile)
if not mode.endswith('b') and PY3:
handle = TextIOWrapper(datafile.handle)
else:
handle = datafile.handle
yield handle
if mode.startswith('w'):
handle.seek(0)
self._write(datafile)
datafile.handle.close()
[docs] def access(self, paths):
""" Access multiple paths as file-like objects
This allows for optimization like parallel downloads. To help track which files
came from which objects, this returns instances of Datafile
Parameters
----------
paths: list of str
The paths of the files to access
Returns
-------
files: list of DataFile
A list of datafile instances, one for each input path
"""
datafiles = []
for path in paths:
datafiles.append(DataFile(path, BytesIO()))
for datafile in datafiles:
self._read(datafile)
return datafiles
[docs] @contextmanager
def store(self, bucket, files):
""" Create file stores that will be written to the filesystem on close
This allows for optimizations when storing several files
Parameters
----------
bucket : str
The path of the bucket (on GCS) or folder (local) to store the data in
files : list of str
The filenames to create
Returns
-------
datafiles : contextmanager
A context manager that yields datafiles and when the context is closed
they are written to GCS
Usage
-----
>>> with filesystem.store('gs://bucket/sub/', ['ex1.txt', 'ex2.txt']) as datafiles:
>>> datafiles[0].handle.write('example 1')
>>> datafiles[1].handle.write('example 2')
"""
# Make BytesIO instances that serve as the file handles
datafiles = []
for f in files:
datafiles.append(DataFile(os.path.join(bucket, f), BytesIO()))
yield datafiles
for d in datafiles:
self._write(d)
def _prefix(self, path):
# find the lowest prefix that doesn't include a pattern match
splits = path.split('/')
accumulate = []
while splits:
sub = splits.pop(0)
if any(x in sub for x in ['*', '?', '[', ']']):
break
accumulate.append(sub)
return '/'.join(accumulate) if accumulate else None
def _list_single(self, prefix, names):
# find files that are directly in the dir specified by prefix, not in subfolders
valid = set(n.replace(prefix, '').lstrip('/').split('/')[0] for n in names)
return [os.path.join(prefix, n) for n in valid]
def _split(self, path):
bucket = path.replace(self.GCS, '').split('/')[0]
prefix = "gs://{}".format(bucket)
path = path[len(prefix) + 1:]
return bucket, path
@_retry_with_backoff
def _list_blobs(self, bucket, prefix=None, delimiter=None):
return self.client().get_bucket(bucket).list_blobs(prefix=prefix, delimiter=delimiter)
@_retry_with_backoff
def _blob(self, path):
bucket, path = self._split(path)
return storage.Blob(path, self.client().get_bucket(bucket))
@_retry_with_backoff
def _transfer(self, path1, path2):
bucket1, path1 = self._split(path1)
bucket2, path2 = self._split(path2)
source_bucket = self.client().get_bucket(bucket1)
source_blob = source_bucket.blob(path1)
destination_bucket = self.client().get_bucket(bucket2)
source_bucket.copy_blob(source_blob, destination_bucket, path2)
@_retry_with_backoff
def _read(self, datafile):
if self.local(datafile.path):
with open(datafile.path, 'rb') as f:
datafile.handle.write(f.read())
else:
self._blob(datafile.path).download_to_file(datafile.handle)
datafile.handle.seek(0)
@_retry_with_backoff
def _write(self, datafile):
datafile.handle.seek(0)
if self.local(datafile.path):
dirname = os.path.dirname(datafile.path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
with open(datafile.path, 'wb') as f:
f.write(datafile.handle.read())
else:
self._blob(datafile.path).upload_from_file(datafile.handle)
def _session_tempdir():
""" Create a tempdir that will be cleaned up at session exit
"""
tmpdir = tempfile.mkdtemp()
# create and use a subdir of specified name to preserve cgroup logic
atexit.register(lambda: shutil.rmtree(tmpdir))
return tmpdir