You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
7.6 KiB

import sys
import json
import os
import base64
import time
from google.cloud import storage
from google.cloud import datastore
from google.cloud import kms
import splunk.conf_util
import splunk.clilib.cli_common as comm
SPLUNK_HOME_PATH = os.environ.get('SPLUNK_HOME', '/opt/splunk')
DATA_ARCHIVE_CONF_PATH = os.path.join(SPLUNK_HOME_PATH, 'etc', comm.getAppDir(), '_cluster_admin', 'local', 'data_archive.conf')
ARCHIVAL_BUCKET_OPTION_NAME = 'archive'
STACK_ID_OPTION_NAME = 'prefix'
def check_if_in_archive(dst_blob, bid, new_key):
""" Check if a given object is already in the bucket.
Log error and exit if found.
"""
try:
if dst_blob.exists():
sys.exit(f'BucketId={bid} path={new_key} already exists!')
except Exception as exc:
sys.stdout.write(f'Unable to check if bucket exists bid={bid} path={new_key} error={exc}')
pass
return True
if __name__ == "__main__":
# check parameters
if len(sys.argv) < 14:
sys.exit('missing arguments')
#required params
arg_index_name = sys.argv[1]
arg_bucket_path = sys.argv[2]
arg_remote_path = sys.argv[3]
arg_bucket_id = sys.argv[4]
arg_bucket_size = sys.argv[5]
arg_start_time = sys.argv[6]
arg_end_time = sys.argv[7]
arg_bucket_name = sys.argv[8]
arg_receipt_path = sys.argv[9]
arg_project_id = sys.argv[10]
arg_key_locations= sys.argv[11]
arg_key_ring = sys.argv[12]
arg_key = sys.argv[13]
if not os.path.exists(DATA_ARCHIVE_CONF_PATH):
sys.exit('data_archive.conf not found at required path=' + DATA_ARCHIVE_CONF_PATH)
archival_bucket_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][ARCHIVAL_BUCKET_OPTION_NAME]
arg_table_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][STACK_ID_OPTION_NAME] + '_BUCKET_HISTORY'
# get file list and encryption info from receipt.json
if not os.path.exists(arg_receipt_path):
sys.exit('failed to locate updated receipt.json: BucketId=' + arg_bucket_id)
fileList = ''
cipher_blob = ''
guid_context = ''
rawSize = ''
try:
with open(arg_receipt_path) as json_data:
data = json.load(json_data)
fileList = data["objects"]
cipher_blob = str(data["user_data"]["cipher_blob"])
guid_context = str(data["user_data"]["uploader_guid"])
rawSize = data["manifest"]["raw_size"]
except Exception as exc:
sys.exit('failed to get info from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
plaintext = ''
try:
kms_client = kms.KeyManagementServiceClient()
key_name = kms_client.crypto_key_path(arg_project_id, arg_key_locations, arg_key_ring, arg_key)
uploader = 'guid:' + guid_context
decrypt_request = kms.DecryptRequest(name=key_name, ciphertext=base64.b64decode(cipher_blob), additional_authenticated_data=uploader.encode())
decrypt_response = kms_client.decrypt(decrypt_request)
plaintext = decrypt_response.plaintext
except Exception as exc:
sys.exit('failed to get customer key from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
# copy data files in the bucket to staging folder, skip receipt.json
storage_client = storage.Client()
old_prefix = arg_remote_path
new_prefix = ''
try:
s = old_prefix.split('/', 1)
new_prefix = s[0] + '/' + s[1]
except Exception as exc:
sys.exit('failed to get staging path from bucket path: ' + arg_remote_path + '; exception =' + str(exc))
src_bucket = storage_client.bucket(arg_bucket_name)
dst_bucket = storage_client.bucket(archival_bucket_name)
processed_expand_files = []
try:
archive_checked = False
for file in fileList:
if file['size'] == 0:
continue
cur_file = file['name'][1:]
cur_key = old_prefix + cur_file
if file.get('expand', False): # handle delete
if cur_file not in processed_expand_files:
list_result = list(src_bucket.list_blobs(prefix=cur_key, delimiter='/'))
for r in list_result:
src_blob = src_bucket.blob(r.name, encryption_key=plaintext)
des_blob = dst_bucket.blob(r.name, encryption_key=plaintext)
if not archive_checked:
archive_checked = check_if_in_archive(des_blob, arg_bucket_id, r.name)
rewrite_token = None
while True:
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
if rewrite_token is None:
break
processed_expand_files.append(cur_file)
else:
new_key = new_prefix + cur_file
src_blob = src_bucket.blob(cur_key, encryption_key=plaintext)
des_blob = dst_bucket.blob(new_key, encryption_key=plaintext)
if not archive_checked:
archive_checked = check_if_in_archive(des_blob, arg_bucket_id, new_key)
rewrite_token = None
while True:
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
if rewrite_token is None:
break
except Exception as exc:
sys.exit('failed to copy bucket to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
else:
sys.stdout.write('successfully copied bucket to archival bucket; ')
# upload receipt.json with restore flag
try:
receipt_key = new_prefix + '/receipt.json'
receipt_blob = dst_bucket.blob(receipt_key)
receipt_blob.upload_from_filename(arg_receipt_path)
except Exception as exc:
sys.exit('failed to copy updated receipt.json to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
else:
sys.stdout.write('successfully uploaded receipt.json to archival bucket; ')
# write bucket info to table
cur_time = str(int(time.time())).zfill(10)
start_time = arg_start_time.zfill(10)
try:
datastore_client = datastore.Client()
key = datastore_client.key(arg_table_name, str(arg_index_name + "--" + start_time + "_" + arg_bucket_id))
new_bucket = datastore.Entity(key=key, exclude_from_indexes=['BucketPath', 'RemoteBucketPath', 'FileList',
'BucketSize', 'RawSize', 'StartTime', 'BucketId'])
new_bucket.update({
'IndexName' : arg_index_name,
'BucketPath': arg_bucket_path,
'RemoteBucketPath': arg_remote_path,
'BucketId' : arg_bucket_id,
'StartTime' : int(arg_start_time),
'EndTime' : int(arg_end_time),
'BucketSize': int(arg_bucket_size),
'FileList' : json.dumps(fileList),
'RawSize' : int(rawSize),
'ArchiveTimeWithBucketID': cur_time + "_" + arg_bucket_id,
'BucketTimeSpan': int(arg_end_time) - int(arg_start_time)
})
datastore_client.put(new_bucket)
except Exception as exc:
sys.exit('failed to write bucket info to bucket history table: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
else:
sys.stdout.write('successfully wrote bucket info to bucket history table')

Powered by BW's shoe-string budget.