import httplib2 from urllib import urlencode import splunk, splunk.rest, splunk.rest.format from parse_xml_buckets import * import json import re import time import os import subprocess #before restarting the master, store the buckets list in /var/run/splunk/cluster BUCKET_LIST_PATH = os.path.join(os.environ['SPLUNK_HOME'] , 'var' , 'run' , 'splunk' , 'cluster', 'buckets.xml') def get_buckets_list(master_uri, auth): f = open(BUCKET_LIST_PATH,'w') atom_buckets = get_xml_feed(master_uri +'/services/cluster/master/buckets?count=-1',auth,'GET') f.write(atom_buckets) f.close() def change_quiet_period(master_uri, auth): args={'quite_period':'600'} return get_response_feed(master_uri+'/services/cluster/config/system?quiet_period=600',auth, 'POST') def num_peers_up(master_uri, auth): count = 0 f= open('peers.xml','w') atom_peers = get_xml_feed(master_uri+'/services/cluster/master/peers?count=-1',auth,'GET') f.write(atom_peers) regex= re.compile('"status">Up') f.close() file = open('peers.xml','r') for line in file: match = regex.findall(line) for line in match: count = count + 1 file.close() os.remove('peers.xml') return count def wait_for_peers(master_uri,auth,original_number): while(num_peers_up(master_uri,auth) != original_number): num_peers_not_up = original_number - num_peers_up(master_uri,auth) print("Still waiting for " +str(num_peers_not_up) +" peers to join ...") time.sleep(5) print("All peers have joined") def get_response_feed(url, auth, method='GET', body=None): (user, password) = auth.split(':') h = httplib2.Http(disable_ssl_certificate_validation=True) h.add_credentials(user, password) if body is None: body = {} response, content = h.request(url, method, urlencode(body)) if response.status == 401: raise Exception("Authorization Failed", url, response) elif response.status != 200: raise Exception(url, response) return splunk.rest.format.parseFeedDocument(content) def get_xml_feed(url, auth, method='GET', body=None): (user, password) = auth.split(':') h = httplib2.Http(disable_ssl_certificate_validation=True) h.add_credentials(user, password) if body is None: body = {} response, content = h.request(url, method, urlencode(body)) if response.status == 401: raise Exception("Authorization Failed", url, response) elif response.status != 200: raise Exception(url, response) return content def validate_rest(master_uri, auth): return get_response_feed(master_uri + '/services/cluster/master/info', auth) def freeze_bucket(master_uri, auth, bid): return get_response_feed(master_uri + '/services/cluster/master/buckets/' + bid + '/freeze', auth, 'POST') def freeze_from_file(master_uri,auth,path=BUCKET_LIST_PATH): file = open(path) #read the buckets.xml from either path supplied or BUCKET_LIST_PATH handler = BucketHandler() parse(file, handler) buckets = handler.getBuckets() fcount = 0 fdone = 0 for bid, bucket in buckets.items(): if bucket.frozen: fcount += 1 try: freeze_bucket(master_uri,auth, bid) fdone += 1 except Exception as e: print(e) print("Total bucket count:: %u; number frozen: %u; number re-frozen: %u" % (len(buckets), fcount, fdone)) def restart_master(master_uri,auth): change_quiet_period(master_uri,auth) original_num_peers = num_peers_up(master_uri,auth) print("\n" + "Issuing cluster manager restart" +"\n") subprocess.call([os.path.join(os.environ["SPLUNK_HOME"],"bin","splunk"), "restart"]) print("\n"+ "The cluster manager was restarted" + "\n") print("\n" + "Waiting for all " +str(original_num_peers) + " peers to come back up" +"\n") wait_for_peers(master_uri,auth,original_num_peers) print("\n" + "Making sure we have the correct number of frozen buckets" + "\n") if __name__ == '__main__': usage = "usage: %prog [options] --auth admin:changeme" parser = OptionParser(usage) parser.add_option("-a","--auth", dest="auth", metavar="user:password", default=':', help="Splunk authentication parameters for the cluster manager instance"); parser.add_option("-g","--get_list", action="store_true",help="get a list of frozen buckets and strore them in buckets.xml"); parser.add_option("-f", "--freeze_from",dest="freeze_from", help="path to the file that contains the list of buckets to be frozen. ie path to the buckets.xml generated by the get_list option above"); (options, args) = parser.parse_args() if len(args) == 0: parser.error("manager_uri is required") elif len(args) > 1: parser.error("incorrect number of arguments") master_uri = args[0] try: validate_rest(master_uri, options.auth) except Exception as e: print("Failed to access the cluster manager info endpoint make sure you've supplied the authentication credentials") raise # Let's get a list of frozen buckets, stored in if(options.get_list): print("Only getting the list of buckets and storing it at" + BUCKET_LIST_PATH) get_buckets_list(master_uri,options.auth) elif(options.freeze_from): print("Reading the list of buckets from" + options.freeze_from + "and refreezing them") freeze_from_file(master_uri,options.auth,options.freeze_from) else: print("Restarting the cluster manager safely to preserve knowledge of frozen buckets") get_buckets_list(master_uri,options.auth) restart_master(master_uri,options.auth) freeze_from_file(master_uri,options.auth,BUCKET_LIST_PATH)