Gsutil2

2020

Download: gsutil2.py

Intro

gsutil is the official Google utility for file operations on GCS (Google Cloud Storage). Although it does have a parallel mode, it is very limited in its use-cases. It is also not async. (Why async is a must-have for any file or networks ops is covered elsewhere on this website). Any large-scale data handling eventually has to have raw file storage. On gcloud, this storage would be gcs. Hence an async version of gsutil that can work on batches of files in async mode is very valuable.

Usage

cat <input-file> | gsutil2 rm
cat <input-file> | gsutil2 mv
cat <input-file> | gsutil2 cp

The input file is a list of move/copy pairs; one pair (space delimited) per line; or a single entry per line for delete.

For example a move, or copy file would look like this.

gs://bucket/path/to/file/file1  gs://bucket2/path/to/file/file1
gs://bucket/path/to/file/file1  gs://bucket2/path2/to/file/file2
...

A delete file would look like this:

gs://bucket/path/to/file/file1
gs://bucket/path/to/file/file2
...

The code plucks N lines at a time (default 200) from the input stream and invokes the cp/rm/mv command specified in async fashion.

Code takes two command line options:

--num-lines:
    Number of lines to read in one chunk; this is equivalent to how many concurrent async calls to make; default is 200.

--no-dry-run:
    Specify this option to actually move / delete / copy files. Default is a dry-run.

e.g:

cat <input-file> | gsutil2 --no-dry-run --num-lines 400 rm

The code can be combined with GNU parallel to invoke several sets of N lines on separate processes. This way you can pretty much maximizes the IO througput of the host machine.

cat move-file.txt | parallel -j 4 --pipe -N 800 gsutil2 --no-dry-run mv

This will invoke 4 processes and feed 800 lines from the input file to each of them. Then on each process; 200 will be picked up and executed in async fashion. And then the next 200 will be picked up, and so on. When all 800 are done, the process becomes available to parallel again and the next set of 800 lines will be given to it.

As per parallel’s documentation it is better to use –pipepart argument which does the chunking on its own.

parallel --pipepart -a <move-file.txt> -j 4 gsutil2 --no-dry-run mv

However, in practice, this may be slower because it may create much smaller batch sizes. These smaller batch sizes then do not fully utilize the async facility of the gsutil2 command to the fullest. Using the –pipe option allows specifying the number of lines to use and hence we can better control the batch sizes.

Tech

It uses gcloud sdk and gevent for async operations.

Aside: Python has plethora of options for async operations - all of them either half-baked, or over-designed, or utterly convoluted in my opinion. After ruling out pretty much all of them (specially including the built-in ones) I have settled to use gevent library for async operations. It is simple, straight forward and works without issues. I will probably cover why I think gevent gets the async model just right some other time.

Code

#! /usr/bin/env python3

'''
Copyright: 44systems.com

Permission:
    This code is in the public domain.
    It is provided "as is," without any guarantees or warranties.
    You may use it freely, but you may not restrict
    others from doing so or claim it as your own.
    Thank you.
'''

'''
Invokes gsutil common ops (cp, rm, mv) in async fashion.
Reads input from stdin.
Input is a one-item or two item string (1 item for delete paths, 2 item for move and copy paths).
Can be combined with `parallel` to invoke multiple processes; e.g:

    parallel --pipepart -a <move-file.txt> -j 20 gsutil2 mv

Or (use pipe option; this was faster because of larger batch sizes per process despite piping itself being slower)

    cat move-file.txt | parallel --pipe -N 800 gsutil2 --no-dry-run mv

Implementation Notes:

- Currently only implements bucket to bucket file copy operations.
Unlike gsutil, local to bucket or vice versa is not supported.
That is because to support it you need to write alternate implementation path
(using methods like upload_from_file or download_to_file methods of a blob).
and I didn't run into that use-case; so it is not implemented.
'''


import gevent
from gevent import monkey; monkey.patch_all()

import os, sys, argparse
from itertools import islice
from google.cloud import storage as gcs


ARGS        = None     # global; will be initialized in main
GCS_CLIENT  = gcs.Client()


class GCSHelper:
    @staticmethod
    def bucket_and_filepath (ls_line):
        bucket, filepath = ls_line.strip().split ('gs://', 1) [-1].split ('/', 1)       # last 1 means stop after first split; thus giving [bucket, filepath]
        if filepath.endswith ('/'):
            raise Exception ('filepath %s is a directory path; it should be a filepath and should end in the name of the file.' % filepath)
        return  bucket, filepath

    @staticmethod
    def delete_one (ls_line):
        try:
            bucket_name, filepath = GCSHelper.bucket_and_filepath (ls_line)
            bucket                = GCS_CLIENT.get_bucket (bucket_name)
            blob                  = bucket.get_blob (filepath)
            print ('    Deleting: ', blob.name)
            blob.delete()
        except Exception as e:
            print (e)

    @staticmethod
    def copy_one (src_ls_line, dst_ls_line):
        try:
            src_bucket_name, src_filepath = GCSHelper.bucket_and_filepath (src_ls_line)
            dst_bucket_name, dst_filepath = GCSHelper.bucket_and_filepath (dst_ls_line)
            if src_bucket_name == None or dst_bucket_name == None:
                raise Exception ('Local to bucket or vice versa copy operation is not currently supported')    #local to/from bucket is not supported
            src_bucket                    = GCS_CLIENT.get_bucket (src_bucket_name)
            dst_bucket                    = GCS_CLIENT.get_bucket (dst_bucket_name)
            src_blob                      = src_bucket.get_blob (src_filepath)
            print ('    Copying: %s to %s' % (src_ls_line, dst_ls_line))
            src_bucket.copy_blob (src_blob, dst_bucket, new_name = dst_filepath)
        except Exception as e:
            print (e)

    @staticmethod
    def move_one (src_ls_line, dst_ls_line):
        try:
            src_bucket_name, src_filepath = GCSHelper.bucket_and_filepath (src_ls_line)
            dst_bucket_name, dst_filepath = GCSHelper.bucket_and_filepath (dst_ls_line)
            src_bucket                    = GCS_CLIENT.get_bucket (src_bucket_name)
            src_blob                      = src_bucket.get_blob (src_filepath)
            print ('    Moving: %s to %s' % (src_ls_line, dst_ls_line))
            src_bucket.rename_blob (src_blob, dst_filepath)
        except Exception as e:
            print (e)


# ====================================== Main methods ====================================================
def do_rm (file_paths):
    if not ARGS.no_dry_run:
        return
    print ('Deleting batch: size %d ...' % len (file_paths))
    jobs = [ gevent.spawn (GCSHelper.delete_one, line) for line in file_paths ]
    gevent.joinall (jobs)
    print ('Deleting batch: size %d ... done' % len (file_paths))


def do_cp (file_path_pairs):
    if not ARGS.no_dry_run:
        return
    print ('Copying batch: size %d ...' % len (file_path_pairs))
    pairs = (i.strip().split() for i in file_path_pairs)        # Notice the generator syntax for lazy evaluation
    jobs  = [ gevent.spawn (GCSHelper.copy_one, src.strip(), dst.strip()) for (src, dst) in pairs ]
    gevent.joinall (jobs)
    print ('Copying batch: size %d ... done' % len (file_path_pairs))


def do_mv (file_path_pairs):
    if not ARGS.no_dry_run:
        return
    print ('Moving batch: size %d ...' % len (file_path_pairs))
    pairs = (i.strip().split() for i in file_path_pairs)        # Notice the generator syntax for lazy evaluation
    jobs  = [ gevent.spawn (GCSHelper.move_one, src.strip(), dst.strip()) for (src, dst) in pairs ]
    gevent.joinall (jobs)
    print ('Moving batch: size %d ... done' % len (file_path_pairs))


# ================================ MAIN + ARGUMENTS ===============================================================
def parse_args():
    # good subparser tutorial: https://towardsdatascience.com/a-simple-guide-to-command-line-arguments-with-argparse-6824c30ab1c3

    NUM_LINES   = 'Number of lines to read in one chunk; this is equivalent to how many concurrent async calls to make; default is 200.'
    NO_DRY_RUN  = 'Specify this option to actually move / delete / copy files. Default is a dry-run.'

    # Create three subparsers for direct commands: rm, cp and mv
    parser      = argparse.ArgumentParser()  # can take description and usage as arguments;
    subparser   = parser.add_subparsers (dest = 'command')
    rm          = subparser.add_parser ('rm')
    cp          = subparser.add_parser ('cp')
    mv          = subparser.add_parser ('mv')

    parser.add_argument ('--num-lines', '-n',   type = int,     default = 200,  help = NUM_LINES)
    parser.add_argument ('--no-dry-run',        action = 'store_true',          help = NO_DRY_RUN)

    return parser.parse_args()


def main():
    global ARGS

    ARGS = parse_args()
    N    =  ARGS.num_lines
    #N    = 3   #override during testing

    CMD  = None
    if ARGS.command == 'rm':
        CMD = do_rm
    elif ARGS.command == 'cp':
        CMD = do_cp
    elif ARGS.command == 'mv':
        CMD = do_mv

    lines = list (islice (sys.stdin, N))
    while lines:
        CMD (lines)
        print ('..................................')
        lines = list (islice (sys.stdin, N))


def test():
    # Testing is subpar here. Not workable as is; modify / crate test source files and then adjust accordingly.
    # Does copy, move and delete into a test bucket. Verify final bucket to be empty.

    # Cross bucket and local copy; local copy is not supported and should fail.
    copy_list_1 = [ 'gs://some-test-bucket1/gsutil2-testing/test.txt     gs://some-test-bucket2/gsutil2-testing/test.txt',
                    'gs://some-test-bucket1/gsutil2-testing/file_a.txt   gs://some-test-bucket2/gsutil2-testing/',    # should not get copied; we need to specify the filename
                    'gs://some-test-bucket1/gsutil2-testing/file_a.txt   gs://some-test-bucket2/gsutil2-testing/file_b.txt' ]
                     #'test1.txt'                                        gs://some-test-bucket2/gsutil2-testing/test-abc.txt', # local to bucket copy use-case (and vice-versa) is not supported.

    do_cp (copy_list_1)

    # Same bucket copy
    copy_list_2 = [ 'gs://some-test-bucket2/gsutil2-testing/test.txt     gs://some-test-bucket2/gsutil2-testing/test-newname.txt',
                    'gs://some-test-bucket2/gsutil2-testing/file_a.txt   gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt', # should not copy; since this source-file does not exist
                    'gs://some-test-bucket2/gsutil2-testing/file_b.txt   gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt' ]

    do_cp (copy_list_2)

    # Rename files
    rename_list_1 = [   'gs://some-test-bucket2/gsutil2-testing/test.txt      gs://some-test-bucket2/gsutil2-testing/a.txt',
                        'gs://some-test-bucket2/gsutil2-testing/file_a.txt    gs://some-test-bucket2/gsutil2-testing/b.txt' ]  # should not happen; no such file
    do_mv (rename_list_1)

    # Delete files
    delete_list_1 = [   'gs://some-test-bucket2/gsutil2-testing/file_b.txt',
                        'gs://some-test-bucket2/gsutil2-testing/a.txt',
                        'gs://some-test-bucket2/gsutil2-testing/b.txt',
                        'gs://some-test-bucket2/gsutil2-testing/t1/file_b_new.txt',
                        'gs://some-test-bucket2/gsutil2-testing/test-newname.txt',
                        'gs://some-test-bucket2/gsutil2-testing/test-abc.txt' ]
    do_rm (delete_list_1)

    # some-test-bucket2/gsutil2-testing/ should be empty
    bucket2 = GCS_CLIENT.get_bucket ('some-test-bucket2')
    blobs = list (bucket2.list_blobs (prefix = 'gsutil2-testing'))
    assert len (blobs) == 1, 'Test failed; target bucket path not empty'    # 1 is for the test folder name itself which remains there.


if __name__ == "__main__":
    main()
    #test()