You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
7.6 KiB

5 months ago
import sys
import json
import os
import base64
import time
from import storage
from import datastore
from import kms
import splunk.conf_util
import splunk.clilib.cli_common as comm
SPLUNK_HOME_PATH = os.environ.get('SPLUNK_HOME', '/opt/splunk')
DATA_ARCHIVE_CONF_PATH = os.path.join(SPLUNK_HOME_PATH, 'etc', comm.getAppDir(), '_cluster_admin', 'local', 'data_archive.conf')
def check_if_in_archive(dst_blob, bid, new_key):
""" Check if a given object is already in the bucket.
Log error and exit if found.
if dst_blob.exists():
sys.exit(f'BucketId={bid} path={new_key} already exists!')
except Exception as exc:
sys.stdout.write(f'Unable to check if bucket exists bid={bid} path={new_key} error={exc}')
return True
if __name__ == "__main__":
# check parameters
if len(sys.argv) < 14:
sys.exit('missing arguments')
#required params
arg_index_name = sys.argv[1]
arg_bucket_path = sys.argv[2]
arg_remote_path = sys.argv[3]
arg_bucket_id = sys.argv[4]
arg_bucket_size = sys.argv[5]
arg_start_time = sys.argv[6]
arg_end_time = sys.argv[7]
arg_bucket_name = sys.argv[8]
arg_receipt_path = sys.argv[9]
arg_project_id = sys.argv[10]
arg_key_locations= sys.argv[11]
arg_key_ring = sys.argv[12]
arg_key = sys.argv[13]
if not os.path.exists(DATA_ARCHIVE_CONF_PATH):
sys.exit('data_archive.conf not found at required path=' + DATA_ARCHIVE_CONF_PATH)
archival_bucket_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][ARCHIVAL_BUCKET_OPTION_NAME]
arg_table_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][STACK_ID_OPTION_NAME] + '_BUCKET_HISTORY'
# get file list and encryption info from receipt.json
if not os.path.exists(arg_receipt_path):
sys.exit('failed to locate updated receipt.json: BucketId=' + arg_bucket_id)
fileList = ''
cipher_blob = ''
guid_context = ''
rawSize = ''
with open(arg_receipt_path) as json_data:
data = json.load(json_data)
fileList = data["objects"]
cipher_blob = str(data["user_data"]["cipher_blob"])
guid_context = str(data["user_data"]["uploader_guid"])
rawSize = data["manifest"]["raw_size"]
except Exception as exc:
sys.exit('failed to get info from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
plaintext = ''
kms_client = kms.KeyManagementServiceClient()
key_name = kms_client.crypto_key_path(arg_project_id, arg_key_locations, arg_key_ring, arg_key)
uploader = 'guid:' + guid_context
decrypt_request = kms.DecryptRequest(name=key_name, ciphertext=base64.b64decode(cipher_blob), additional_authenticated_data=uploader.encode())
decrypt_response = kms_client.decrypt(decrypt_request)
plaintext = decrypt_response.plaintext
except Exception as exc:
sys.exit('failed to get customer key from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
# copy data files in the bucket to staging folder, skip receipt.json
storage_client = storage.Client()
old_prefix = arg_remote_path
new_prefix = ''
s = old_prefix.split('/', 1)
new_prefix = s[0] + '/' + s[1]
except Exception as exc:
sys.exit('failed to get staging path from bucket path: ' + arg_remote_path + '; exception =' + str(exc))
src_bucket = storage_client.bucket(arg_bucket_name)
dst_bucket = storage_client.bucket(archival_bucket_name)
processed_expand_files = []
archive_checked = False
for file in fileList:
if file['size'] == 0:
cur_file = file['name'][1:]
cur_key = old_prefix + cur_file
if file.get('expand', False): # handle delete
if cur_file not in processed_expand_files:
list_result = list(src_bucket.list_blobs(prefix=cur_key, delimiter='/'))
for r in list_result:
src_blob = src_bucket.blob(, encryption_key=plaintext)
des_blob = dst_bucket.blob(, encryption_key=plaintext)
if not archive_checked:
archive_checked = check_if_in_archive(des_blob, arg_bucket_id,
rewrite_token = None
while True:
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
if rewrite_token is None:
new_key = new_prefix + cur_file
src_blob = src_bucket.blob(cur_key, encryption_key=plaintext)
des_blob = dst_bucket.blob(new_key, encryption_key=plaintext)
if not archive_checked:
archive_checked = check_if_in_archive(des_blob, arg_bucket_id, new_key)
rewrite_token = None
while True:
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
if rewrite_token is None:
except Exception as exc:
sys.exit('failed to copy bucket to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
sys.stdout.write('successfully copied bucket to archival bucket; ')
# upload receipt.json with restore flag
receipt_key = new_prefix + '/receipt.json'
receipt_blob = dst_bucket.blob(receipt_key)
except Exception as exc:
sys.exit('failed to copy updated receipt.json to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
sys.stdout.write('successfully uploaded receipt.json to archival bucket; ')
# write bucket info to table
cur_time = str(int(time.time())).zfill(10)
start_time = arg_start_time.zfill(10)
datastore_client = datastore.Client()
key = datastore_client.key(arg_table_name, str(arg_index_name + "--" + start_time + "_" + arg_bucket_id))
new_bucket = datastore.Entity(key=key, exclude_from_indexes=['BucketPath', 'RemoteBucketPath', 'FileList',
'BucketSize', 'RawSize', 'StartTime', 'BucketId'])
'IndexName' : arg_index_name,
'BucketPath': arg_bucket_path,
'RemoteBucketPath': arg_remote_path,
'BucketId' : arg_bucket_id,
'StartTime' : int(arg_start_time),
'EndTime' : int(arg_end_time),
'BucketSize': int(arg_bucket_size),
'FileList' : json.dumps(fileList),
'RawSize' : int(rawSize),
'ArchiveTimeWithBucketID': cur_time + "_" + arg_bucket_id,
'BucketTimeSpan': int(arg_end_time) - int(arg_start_time)
except Exception as exc:
sys.exit('failed to write bucket info to bucket history table: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
sys.stdout.write('successfully wrote bucket info to bucket history table')

Powered by BW's shoe-string budget.