Gsutil2
Download: gsutil2.py
Intro
gsutil
is the official Google utility for file operations on GCS (Google Cloud Storage)
. Although it does have a parallel mode, it is very limited in its use-cases. It is also not async. (Why async is a must-have for any file or networks ops is covered elsewhere on this website). Any large-scale data handling eventually has to have raw file storage. On gcloud
, this storage would be gcs
. Hence an async version of gsutil
that can work on batches of files in async mode is very valuable.
Usage
cat <input-file> | gsutil2 rm
cat <input-file> | gsutil2 mv
cat <input-file> | gsutil2 cp
The input file is a list of move/copy pairs; one pair (space delimited) per line; or a single entry per line for delete.
For example a move, or copy file would look like this.
gs://bucket/path/to/file/file1 gs://bucket2/path/to/file/file1
gs://bucket/path/to/file/file1 gs://bucket2/path2/to/file/file2
...
A delete file would look like this:
gs://bucket/path/to/file/file1
gs://bucket/path/to/file/file2
...
The code plucks N lines at a time (default 200) from the input stream and invokes the cp/rm/mv command specified in async fashion.
Code takes two command line options:
--num-lines:
Number of lines to read in one chunk; this is equivalent to how many concurrent async calls to make; default is 200.
--no-dry-run:
Specify this option to actually move / delete / copy files. Default is a dry-run.
e.g:
cat <input-file> | gsutil2 --no-dry-run --num-lines 400 rm
The code can be combined with GNU parallel
to invoke several sets of N lines on separate processes.
This way you can pretty much maximizes the IO througput of the host machine.
cat move-file.txt | parallel -j 4 --pipe -N 800 gsutil2 --no-dry-run mv
This will invoke 4 processes and feed 800 lines from the input file to each of them.
Then on each process; 200 will be picked up and executed in async fashion.
And then the next 200 will be picked up, and so on.
When all 800 are done, the process becomes available to parallel
again and the next
set of 800 lines will be given to it.
As per parallel’s documentation it is better to use –pipepart argument which does the chunking on its own.
parallel --pipepart -a <move-file.txt> -j 4 gsutil2 --no-dry-run mv
However, in practice, this may be slower because it may create much smaller batch sizes. These smaller batch sizes then do not fully utilize the async
facility of the gsutil2
command to the fullest. Using the –pipe option allows specifying the
number of lines to use and hence we can better control the batch sizes.
Tech
It uses gcloud sdk
and gevent
for async operations.
Aside: Python has plethora of options for async operations - all of them either half-baked, or over-designed, or utterly convoluted in my opinion.
After ruling out pretty much all of them (specially including the built-in ones) I have settled to use gevent
library for async operations.
It is simple, straight forward and works without issues. I will probably cover why I think gevent
gets the async model just right some other time.
Code
#! /usr/bin/env python3
'''
Copyright: 44systems.com
Permission:
This code is in the public domain.
It is provided "as is," without any guarantees or warranties.
You may use it freely, but you may not restrict
others from doing so or claim it as your own.
Thank you.
'''
'''
Invokes gsutil common ops (cp, rm, mv) in async fashion.
Reads input from stdin.
Input is a one-item or two item string (1 item for delete paths, 2 item for move and copy paths).
Can be combined with `parallel` to invoke multiple processes; e.g:
parallel --pipepart -a <move-file.txt> -j 20 gsutil2 mv
Or (use pipe option; this was faster because of larger batch sizes per process despite piping itself being slower)
cat move-file.txt | parallel --pipe -N 800 gsutil2 --no-dry-run mv
Implementation Notes:
- Currently only implements bucket to bucket file copy operations.
Unlike gsutil, local to bucket or vice versa is not supported.
That is because to support it you need to write alternate implementation path
(using methods like upload_from_file or download_to_file methods of a blob).
and I didn't run into that use-case; so it is not implemented.
'''
import gevent
from gevent import monkey; monkey.patch_all()
import os, sys, argparse
from itertools import islice
from google.cloud import storage as gcs
ARGS = None # global; will be initialized in main
GCS_CLIENT = gcs.Client()
class GCSHelper:
@staticmethod
def bucket_and_filepath (ls_line):
bucket, filepath = ls_line.strip().split ('gs://', 1) [-1].split ('/', 1) # last 1 means stop after first split; thus giving [bucket, filepath]
if filepath.endswith ('/'):
raise Exception ('filepath %s is a directory path; it should be a filepath and should end in the name of the file.' % filepath)
return bucket, filepath
@staticmethod
def delete_one (ls_line):
try:
bucket_name, filepath = GCSHelper.bucket_and_filepath (ls_line)
bucket = GCS_CLIENT.get_bucket (bucket_name)
blob = bucket.get_blob (filepath)
print (' Deleting: ', blob.name)
blob.delete()
except Exception as e:
print (e)
@staticmethod
def copy_one (src_ls_line, dst_ls_line):
try:
src_bucket_name, src_filepath = GCSHelper.bucket_and_filepath (src_ls_line)
dst_bucket_name, dst_filepath = GCSHelper.bucket_and_filepath (dst_ls_line)
if src_bucket_name == None or dst_bucket_name == None:
raise Exception ('Local to bucket or vice versa copy operation is not currently supported') #local to/from bucket is not supported
src_bucket = GCS_CLIENT.get_bucket (src_bucket_name)
dst_bucket = GCS_CLIENT.get_bucket (dst_bucket_name)
src_blob = src_bucket.get_blob (src_filepath)
print (' Copying: %s to %s' % (src_ls_line, dst_ls_line))
src_bucket.copy_blob (src_blob, dst_bucket, new_name = dst_filepath)
except Exception as e:
print (e)
@staticmethod
def move_one (src_ls_line, dst_ls_line):
try:
src_bucket_name, src_filepath = GCSHelper.bucket_and_filepath (src_ls_line)
dst_bucket_name, dst_filepath = GCSHelper.bucket_and_filepath (dst_ls_line)
src_bucket = GCS_CLIENT.get_bucket (src_bucket_name)
src_blob = src_bucket.get_blob (src_filepath)
print (' Moving: %s to %s' % (src_ls_line, dst_ls_line))
src_bucket.rename_blob (src_blob, dst_filepath)
except Exception as e:
print (e)
# ====================================== Main methods ====================================================
def do_rm (file_paths):
if not ARGS.no_dry_run:
return
print ('Deleting batch: size %d ...' % len (file_paths))
jobs = [ gevent.spawn (GCSHelper.delete_one, line) for line in file_paths ]
gevent.joinall (jobs)
print ('Deleting batch: size %d ... done' % len (file_paths))
def do_cp (file_path_pairs):
if not ARGS.no_dry_run:
return
print ('Copying batch: size %d ...' % len (file_path_pairs))
pairs = (i.strip().split() for i in file_path_pairs) # Notice the generator syntax for lazy evaluation
jobs = [ gevent.spawn (GCSHelper.copy_one, src.strip(), dst.strip()) for (src, dst) in pairs ]
gevent.joinall (jobs)
print ('Copying batch: size %d ... done' % len (file_path_pairs))
def do_mv (file_path_pairs):
if not ARGS.no_dry_run:
return
print ('Moving batch: size %d ...' % len (file_path_pairs))
pairs = (i.strip().split() for i in file_path_pairs) # Notice the generator syntax for lazy evaluation
jobs = [ gevent.spawn (GCSHelper.move_one, src.strip(), dst.strip()) for (src, dst) in pairs ]
gevent.joinall (jobs)
print ('Moving batch: size %d ... done' % len (file_path_pairs))
# ================================ MAIN + ARGUMENTS ===============================================================
def parse_args():
# good subparser tutorial: https://towardsdatascience.com/a-simple-guide-to-command-line-arguments-with-argparse-6824c30ab1c3
NUM_LINES = 'Number of lines to read in one chunk; this is equivalent to how many concurrent async calls to make; default is 200.'
NO_DRY_RUN = 'Specify this option to actually move / delete / copy files. Default is a dry-run.'
# Create three subparsers for direct commands: rm, cp and mv
parser = argparse.ArgumentParser() # can take description and usage as arguments;
subparser = parser.add_subparsers (dest = 'command')
rm = subparser.add_parser ('rm')
cp = subparser.add_parser ('cp')
mv = subparser.add_parser ('mv')
parser.add_argument ('--num-lines', '-n', type = int, default = 200, help = NUM_LINES)
parser.add_argument ('--no-dry-run', action = 'store_true', help = NO_DRY_RUN)
return parser.parse_args()
def main():
global ARGS
ARGS = parse_args()
N = ARGS.num_lines
#N = 3 #override during testing
CMD = None
if ARGS.command == 'rm':
CMD = do_rm
elif ARGS.command == 'cp':
CMD = do_cp
elif ARGS.command == 'mv':
CMD = do_mv
lines = list (islice (sys.stdin, N))
while lines:
CMD (lines)
print ('..................................')
lines = list (islice (sys.stdin, N))
def test():
# Testing is subpar here. Not workable as is; modify / crate test source files and then adjust accordingly.
# Does copy, move and delete into a test bucket. Verify final bucket to be empty.
# Cross bucket and local copy; local copy is not supported and should fail.
copy_list_1 = [ 'gs://some-test-bucket1/gsutil2-testing/test.txt gs://some-test-bucket2/gsutil2-testing/test.txt',
'gs://some-test-bucket1/gsutil2-testing/file_a.txt gs://some-test-bucket2/gsutil2-testing/', # should not get copied; we need to specify the filename
'gs://some-test-bucket1/gsutil2-testing/file_a.txt gs://some-test-bucket2/gsutil2-testing/file_b.txt' ]
#'test1.txt' gs://some-test-bucket2/gsutil2-testing/test-abc.txt', # local to bucket copy use-case (and vice-versa) is not supported.
do_cp (copy_list_1)
# Same bucket copy
copy_list_2 = [ 'gs://some-test-bucket2/gsutil2-testing/test.txt gs://some-test-bucket2/gsutil2-testing/test-newname.txt',
'gs://some-test-bucket2/gsutil2-testing/file_a.txt gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt', # should not copy; since this source-file does not exist
'gs://some-test-bucket2/gsutil2-testing/file_b.txt gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt' ]
do_cp (copy_list_2)
# Rename files
rename_list_1 = [ 'gs://some-test-bucket2/gsutil2-testing/test.txt gs://some-test-bucket2/gsutil2-testing/a.txt',
'gs://some-test-bucket2/gsutil2-testing/file_a.txt gs://some-test-bucket2/gsutil2-testing/b.txt' ] # should not happen; no such file
do_mv (rename_list_1)
# Delete files
delete_list_1 = [ 'gs://some-test-bucket2/gsutil2-testing/file_b.txt',
'gs://some-test-bucket2/gsutil2-testing/a.txt',
'gs://some-test-bucket2/gsutil2-testing/b.txt',
'gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt',
'gs://some-test-bucket2/gsutil2-testing/test-newname.txt',
'gs://some-test-bucket2/gsutil2-testing/test-abc.txt' ]
do_rm (delete_list_1)
# some-test-bucket2/gsutil2-testing/ should be empty
bucket2 = GCS_CLIENT.get_bucket ('some-test-bucket2')
blobs = list (bucket2.list_blobs (prefix = 'gsutil2-testing'))
assert len (blobs) == 1, 'Test failed; target bucket path not empty' # 1 is for the test folder name itself which remains there.
if __name__ == "__main__":
main()
#test()