You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
183 lines
7.6 KiB
183 lines
7.6 KiB
5 months ago
|
import sys
|
||
|
import json
|
||
|
import os
|
||
|
import base64
|
||
|
import time
|
||
|
from google.cloud import storage
|
||
|
from google.cloud import datastore
|
||
|
from google.cloud import kms
|
||
|
|
||
|
import splunk.conf_util
|
||
|
import splunk.clilib.cli_common as comm
|
||
|
|
||
|
SPLUNK_HOME_PATH = os.environ.get('SPLUNK_HOME', '/opt/splunk')
|
||
|
DATA_ARCHIVE_CONF_PATH = os.path.join(SPLUNK_HOME_PATH, 'etc', comm.getAppDir(), '_cluster_admin', 'local', 'data_archive.conf')
|
||
|
ARCHIVAL_BUCKET_OPTION_NAME = 'archive'
|
||
|
STACK_ID_OPTION_NAME = 'prefix'
|
||
|
|
||
|
|
||
|
def check_if_in_archive(dst_blob, bid, new_key):
|
||
|
""" Check if a given object is already in the bucket.
|
||
|
Log error and exit if found.
|
||
|
"""
|
||
|
try:
|
||
|
if dst_blob.exists():
|
||
|
sys.exit(f'BucketId={bid} path={new_key} already exists!')
|
||
|
except Exception as exc:
|
||
|
sys.stdout.write(f'Unable to check if bucket exists bid={bid} path={new_key} error={exc}')
|
||
|
pass
|
||
|
return True
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# check parameters
|
||
|
if len(sys.argv) < 14:
|
||
|
sys.exit('missing arguments')
|
||
|
|
||
|
#required params
|
||
|
arg_index_name = sys.argv[1]
|
||
|
arg_bucket_path = sys.argv[2]
|
||
|
arg_remote_path = sys.argv[3]
|
||
|
arg_bucket_id = sys.argv[4]
|
||
|
arg_bucket_size = sys.argv[5]
|
||
|
arg_start_time = sys.argv[6]
|
||
|
arg_end_time = sys.argv[7]
|
||
|
arg_bucket_name = sys.argv[8]
|
||
|
arg_receipt_path = sys.argv[9]
|
||
|
|
||
|
arg_project_id = sys.argv[10]
|
||
|
arg_key_locations= sys.argv[11]
|
||
|
arg_key_ring = sys.argv[12]
|
||
|
arg_key = sys.argv[13]
|
||
|
|
||
|
|
||
|
if not os.path.exists(DATA_ARCHIVE_CONF_PATH):
|
||
|
sys.exit('data_archive.conf not found at required path=' + DATA_ARCHIVE_CONF_PATH)
|
||
|
|
||
|
archival_bucket_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][ARCHIVAL_BUCKET_OPTION_NAME]
|
||
|
arg_table_name = splunk.conf_util.ConfigMap(DATA_ARCHIVE_CONF_PATH)['buckets'][STACK_ID_OPTION_NAME] + '_BUCKET_HISTORY'
|
||
|
|
||
|
# get file list and encryption info from receipt.json
|
||
|
if not os.path.exists(arg_receipt_path):
|
||
|
sys.exit('failed to locate updated receipt.json: BucketId=' + arg_bucket_id)
|
||
|
|
||
|
fileList = ''
|
||
|
cipher_blob = ''
|
||
|
guid_context = ''
|
||
|
rawSize = ''
|
||
|
try:
|
||
|
with open(arg_receipt_path) as json_data:
|
||
|
data = json.load(json_data)
|
||
|
fileList = data["objects"]
|
||
|
cipher_blob = str(data["user_data"]["cipher_blob"])
|
||
|
guid_context = str(data["user_data"]["uploader_guid"])
|
||
|
rawSize = data["manifest"]["raw_size"]
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to get info from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
|
||
|
|
||
|
plaintext = ''
|
||
|
try:
|
||
|
kms_client = kms.KeyManagementServiceClient()
|
||
|
key_name = kms_client.crypto_key_path(arg_project_id, arg_key_locations, arg_key_ring, arg_key)
|
||
|
uploader = 'guid:' + guid_context
|
||
|
decrypt_request = kms.DecryptRequest(name=key_name, ciphertext=base64.b64decode(cipher_blob), additional_authenticated_data=uploader.encode())
|
||
|
decrypt_response = kms_client.decrypt(decrypt_request)
|
||
|
plaintext = decrypt_response.plaintext
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to get customer key from receipt.json: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
|
||
|
|
||
|
# copy data files in the bucket to staging folder, skip receipt.json
|
||
|
storage_client = storage.Client()
|
||
|
|
||
|
old_prefix = arg_remote_path
|
||
|
new_prefix = ''
|
||
|
try:
|
||
|
s = old_prefix.split('/', 1)
|
||
|
new_prefix = s[0] + '/' + s[1]
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to get staging path from bucket path: ' + arg_remote_path + '; exception =' + str(exc))
|
||
|
|
||
|
src_bucket = storage_client.bucket(arg_bucket_name)
|
||
|
dst_bucket = storage_client.bucket(archival_bucket_name)
|
||
|
processed_expand_files = []
|
||
|
|
||
|
try:
|
||
|
archive_checked = False
|
||
|
for file in fileList:
|
||
|
if file['size'] == 0:
|
||
|
continue
|
||
|
cur_file = file['name'][1:]
|
||
|
cur_key = old_prefix + cur_file
|
||
|
|
||
|
if file.get('expand', False): # handle delete
|
||
|
if cur_file not in processed_expand_files:
|
||
|
list_result = list(src_bucket.list_blobs(prefix=cur_key, delimiter='/'))
|
||
|
for r in list_result:
|
||
|
src_blob = src_bucket.blob(r.name, encryption_key=plaintext)
|
||
|
des_blob = dst_bucket.blob(r.name, encryption_key=plaintext)
|
||
|
if not archive_checked:
|
||
|
archive_checked = check_if_in_archive(des_blob, arg_bucket_id, r.name)
|
||
|
rewrite_token = None
|
||
|
while True:
|
||
|
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
|
||
|
if rewrite_token is None:
|
||
|
break
|
||
|
processed_expand_files.append(cur_file)
|
||
|
else:
|
||
|
new_key = new_prefix + cur_file
|
||
|
src_blob = src_bucket.blob(cur_key, encryption_key=plaintext)
|
||
|
des_blob = dst_bucket.blob(new_key, encryption_key=plaintext)
|
||
|
|
||
|
if not archive_checked:
|
||
|
archive_checked = check_if_in_archive(des_blob, arg_bucket_id, new_key)
|
||
|
|
||
|
rewrite_token = None
|
||
|
while True:
|
||
|
rewrite_token, bytes_rewritten, total_bytes = des_blob.rewrite(src_blob, token=rewrite_token)
|
||
|
if rewrite_token is None:
|
||
|
break
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to copy bucket to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
|
||
|
else:
|
||
|
sys.stdout.write('successfully copied bucket to archival bucket; ')
|
||
|
|
||
|
# upload receipt.json with restore flag
|
||
|
try:
|
||
|
receipt_key = new_prefix + '/receipt.json'
|
||
|
receipt_blob = dst_bucket.blob(receipt_key)
|
||
|
receipt_blob.upload_from_filename(arg_receipt_path)
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to copy updated receipt.json to archival bucket: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
|
||
|
else:
|
||
|
sys.stdout.write('successfully uploaded receipt.json to archival bucket; ')
|
||
|
|
||
|
|
||
|
# write bucket info to table
|
||
|
cur_time = str(int(time.time())).zfill(10)
|
||
|
start_time = arg_start_time.zfill(10)
|
||
|
|
||
|
try:
|
||
|
datastore_client = datastore.Client()
|
||
|
key = datastore_client.key(arg_table_name, str(arg_index_name + "--" + start_time + "_" + arg_bucket_id))
|
||
|
new_bucket = datastore.Entity(key=key, exclude_from_indexes=['BucketPath', 'RemoteBucketPath', 'FileList',
|
||
|
'BucketSize', 'RawSize', 'StartTime', 'BucketId'])
|
||
|
|
||
|
new_bucket.update({
|
||
|
'IndexName' : arg_index_name,
|
||
|
'BucketPath': arg_bucket_path,
|
||
|
'RemoteBucketPath': arg_remote_path,
|
||
|
'BucketId' : arg_bucket_id,
|
||
|
'StartTime' : int(arg_start_time),
|
||
|
'EndTime' : int(arg_end_time),
|
||
|
'BucketSize': int(arg_bucket_size),
|
||
|
'FileList' : json.dumps(fileList),
|
||
|
'RawSize' : int(rawSize),
|
||
|
'ArchiveTimeWithBucketID': cur_time + "_" + arg_bucket_id,
|
||
|
'BucketTimeSpan': int(arg_end_time) - int(arg_start_time)
|
||
|
})
|
||
|
datastore_client.put(new_bucket)
|
||
|
except Exception as exc:
|
||
|
sys.exit('failed to write bucket info to bucket history table: BucketId=' + arg_bucket_id + '; exception =' + str(exc))
|
||
|
else:
|
||
|
sys.stdout.write('successfully wrote bucket info to bucket history table')
|