# coding=utf-8 # # Main utility module for general Splunk stuff # from builtins import zip from builtins import range from builtins import map from datetime import timedelta, tzinfo, datetime import time import re import os, sys, codecs import math import subprocess from future.moves.urllib import parse try: # Python 3 from collections import UserDict except ImportError: # Python 2 from UserDict import UserDict # Normally, # # import html # # would be fine, but because "html" is a generic module name new to # Python 3, and because apps can mangle sys.path/site/PYTHONPATH, we may # import an "html" module that isn't one from Python, but one from an app # instead. By using importlib, we guarantee we're importing Python's # "html". if sys.version_info >= (3, 0): from importlib.util import find_spec, module_from_spec html_spec = find_spec("html", os.path.join(os.path.dirname(sys.executable), "lib")) html = module_from_spec(html_spec) html_spec.loader.exec_module(html) else: import HTMLParser FIELD_DELIMITER = "," FIELD_ESCAPE = "\\" FIELD_QUOTE = "\"" FIELD_DELIMITER_CHARS = " ,\\" BAD_URL_PARAM = re.compile(r"javascript[\\t\s\W]*?:") # Defines string_type for use in checking whether a variable is a string-like # thing with isinstance, e.g. instead of: # # isinstance(s, basestring) # # you should use: # # isinstance(s, splunk.util.string_type) # # This is a hack for removing basestring. You should not use it in new code, # and if you're refactoring code, please remove use of this and check against: # # For Unicode string (Python 3 str) # # - Python 2: splunk.util.unicode (do NOT use unicode!) or # builtins.str (from future) # - Python 3: str # # For bytes: # # - Python 2: bytes # - Python 3: bytes # # If your code does not actually care whether something is str or bytes, # check with: # # isinstance(s, str) or isinstance(s, bytes) # if sys.version_info >= (3, 0): string_type = (str, bytes) unicode = str else: import __builtin__ string_type = __builtin__.basestring unicode = unicode def cmp(x, y): """cmp(x, y) -> integer. Replacement for Python 2's cmp in Python 3. If you're using this, please refactor to NOT use it. :returns: Negative if x < y, 0 if x == y, positive if x > y. """ if sys.version_info < (3, 0): # Python 2 import __builtin__ return __builtin__.cmp(x, y) else: # Python 3+ return (x > y) - (x < y) def normalizeBoolean(input, enableStrictMode=False, includeIntegers=True): ''' Tries to convert a value to Boolean. Accepts the following pairs: true/false t/f/ 0/1 yes/no on/off y/n If given a dictionary, this function will attempt to iterate over the dictionary and normalize each item. If given a list, this function will attempt to iterate over the list and normalize each item. If enableStrictMode is True, then a ValueError will be raised if the input value is not a recognized boolean. If enableStrictMode is False (default), then the input will be returned unchanged if it is not recognized as a boolean. Thus, they will have the truth value of the python language. NOTE: Use this method judiciously, as you may be casting integer values into boolean when you don't want to. If you do want to get integer values, the idiom for that is: try: v = int(v) except ValueError: v = splunk.util.normalizeBoolean(v) This casts integer-like values into 'int', and others into boolean. ''' trueThings = ['true', 't', 'on', 'yes', 'y'] falseThings = ['false', 'f', 'off', 'no', 'n'] if includeIntegers: trueThings.append('1') falseThings.append('0') def norm(input): if input == True: return True if input == False: return False try: test = input.strip().lower() except: return input if test in trueThings: return True elif test in falseThings: return False elif enableStrictMode: raise ValueError('Unable to cast value to boolean: %s' % input) else: return input if isinstance(input, dict): for k, v in input.items(): input[k] = norm(v) return input elif isinstance(input, list): for i, v in enumerate(input): input[i] = norm(v) return input else: return norm(input) def stringToFieldList(string): ''' Given a string split it apart using the field list rules: 1) Comma is the default delimiter (space works only for the field_list param on the /search/jobs//events endpoint). 2) All fields that contain the delimiter (space or comma), escape or a double quote char ", must be quoted. 3) Backslash is used to escape backslashes and double quote characters. All other instances are interpreted as backslash chars. For example: stringToFieldList('one two, three \\ four "and\" five"') literal ["one", "two", "three", "\", "four", 'and" five'] ''' if not isinstance(string, string_type): return [] items = [] item_buffer = [] in_quote = False iterator = enumerate(string) for i, c in iterator: if c == FIELD_ESCAPE: try: next_item = next(iterator)[1] if next_item in ['"', '\\']: item_buffer.append(next_item) continue else: item_buffer.append(FIELD_ESCAPE) c = next_item except StopIteration: item_buffer.append(c) continue if c == FIELD_QUOTE: if not in_quote: in_quote = True continue if in_quote: in_quote = False items.append(''.join(item_buffer)) item_buffer = [] continue if c in FIELD_DELIMITER_CHARS and not in_quote: if len(item_buffer) > 0: items.append(''.join(item_buffer)) item_buffer = [] continue item_buffer.append(c) if len(item_buffer) > 0: items.append(''.join(item_buffer)) return items def fieldListToString(fieldList, delimiter=FIELD_DELIMITER): ''' Given a list of strings, converts the list into a valid Unicode string compliant with the splunkd field_list attribute. A valid field list string is delimited by either comma (default) or space and groups by the double quote char ". Backslash escapes " and itself. Arguments: fieldList -- A list of strings to convert into a valid field list string. Example usage: >> field_list = ["_raw", "first \ ip", '"weird quoted string"'] >> fieldListToString(field_list) >> '_raw,"first \\ ip","\"weird quoted string\""' Returns: A Unicode string of all the elements in lst deliminated by the given delimiter. ''' re_escaped = re.escape(FIELD_ESCAPE) delimiter_matcher = re.compile("[%s]" % re.escape(FIELD_DELIMITER_CHARS)) escapable = re.compile("([%s])" % (re_escaped + FIELD_QUOTE)) output_buffer = [] for item in fieldList: # Convert all items to strings. This allows objects to def a __str__ # method and just work. May raise an exception if something cannot # be converted to a unicode string. item = unicode(item) item = item.strip() if item == '': continue # Escape all backslashes or double quotes if escapable.search(item): item = escapable.sub(re_escaped + r"\1", item) # Finally quote the item if needed and return a unicode string. if delimiter_matcher.search(item): item = u''.join([FIELD_QUOTE, item, FIELD_QUOTE]) output_buffer.append(item) return delimiter.join(output_buffer) def smartTrim(string, maxLength=50, placeholder='...'): ''' Returns a string trimmed to maxLength by removing characters from the middle of the string and replacing with ellipses. Ex: smartTrim('1234567890', 5) ==> '12...890' ''' if not string: return string if int(maxLength) < 1: return string if len(string) <= maxLength: return string if maxLength == 1: return string[0:1] + placeholder midpoint = math.ceil(len(string) / 2.0) toremove = len(string) - maxLength lstrip = math.ceil(toremove / 2.0) rstrip = toremove - lstrip lbound = int(midpoint - lstrip) rbound = int(midpoint + rstrip) return string[0:lbound] + '...' + string[rbound:] # # Time handling routines # def getTimeOffset(t=None, dual_output=False): """Return offset of local zone from GMT in seconds, either at present or at time t.""" # python2.3 localtime() can't take None if t is None: t = time.time() if not dual_output: if time.localtime(t).tm_isdst and time.daylight: return -time.altzone else: return -time.timezone return (-time.timezone, -time.altzone) def format_local_tzoffset(t=None): ''' Render the current process-local timezone offset in standard -0800 type format for the present or at time t. ''' offset_secs = getTimeOffset(t) plus_minus = "+" if offset_secs < 0: plus_minus = '-' offset_secs = abs(offset_secs) hours, rem_secs = divmod(offset_secs, 3600 ) # 60s * 60m -> hours minutes = rem_secs // 60 return "%s%0.2i%0.2i" % (plus_minus, hours, minutes) # defines time format string for ISO-8601 datetimes ISO_8601_STRFTIME = '%Y-%m-%dT%H:%M:%S' + format_local_tzoffset() # defines time format string for ISO-8601 datetimes, with a token for microsecond # insertion by a second pass; see getIsoTime ISO_8601_STRFTIME_MSECOND = '%Y-%m-%dT%H:%M:%S{msec}' + format_local_tzoffset() # defines canonical 0 time difference ZEROTIME = timedelta(0) # defines canonical 1 hour time difference HOUR = timedelta(hours=1) # define local non-DST offset STDOFFSET = timedelta(seconds = -time.timezone) # defin local DST offset if time.daylight: DSTOFFSET = timedelta(seconds = -time.altzone) else: DSTOFFSET = STDOFFSET class UTCInfo(tzinfo): """ Represents a UTC timezone. Use when a timezone-aware datetime() needs to be identified as a UTC time. Most invocations should use the singleton instance defined as splunk.util.utc """ def utcoffset(self, dt): return ZEROTIME def tzname(self, dt): return "UTC" def dst(self, dt): return ZEROTIME utc = UTCInfo() class LocalTZInfo(tzinfo): ''' Represents the local server's idea of its native timezone. Use when creating a timezone-aware datetime() object. Most invocations should use the singleton instance defined as splunk.util.localTZ ''' def utcoffset(self, dt): if self._isdst(dt): return DSTOFFSET else: return STDOFFSET def dst(self, dt): if self._isdst(dt): return DSTOFFSET - STDOFFSET else: return ZEROTIME def tzname(self, dt): return time.tzname[self._isdst(dt)] def _isdst(self, dt): try: tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.weekday(), 0, -1) stamp = time.mktime(tt) tt = time.localtime(stamp) return tt.tm_isdst > 0 except: return False localTZ = LocalTZInfo() class TZInfo(tzinfo): """ Represents a generic fixed offset timezone, as specified by 'offset' in minutes east of UTC (US is negative minutes). Setting offset=0 or None will result in a UTC-like timezone object that coerces an enclosing datetime()->time_struct with is_dst=-1. """ def __init__(self, offset=None, name=''): if offset == None: offset = getTimeOffset() // 60 self.__offset = timedelta(minutes = offset) self.__name = name def utcoffset(self, dt): return self.__offset def tzname(self, dt): return self.__name def dst(self, dt): return ZEROTIME def __repr__(self): return '' % (self.__offset, self.__name) iso_re = None offset_re = None BYTE_PARSE_REX = None compiled_regexes = False def _compile_regexes(): global compiled_regexes, iso_re, offset_re, BYTE_PARSE_REX if compiled_regexes: return iso_re = re.compile(r'(\d{4})\-(\d{2})-(\d{2})[T ](\d{2}):(\d{2}):(\d{2})(\.(\d{1,6}))?(z|Z|[\+\-]\d{2}\:?\d{2})?') offset_re = re.compile(r'([\+\-]?)(\d{2})\:?(\d{2})') BYTE_PARSE_REX = re.compile(r'(\-?[0-9\.]+)\s*([A-Za-z]{1,3})') compiled_regexes = True def parseISO(timestamp, strict=False): ''' Converts an ISO-8601 datetime string into a native python datetime.datetime object. This only supports a strict well-formed time: Offset-explicit timezone: 2005-07-01T00:00:00.000-07:00 2005-07-01 00:00:00.000-07:00 2005-07-01 00:00:00.000-0700 The datetime object's tzinfo will be set to an instance of splunk.util.TZInfo() UTC timezone: 2005-07-01T00:00:00.000Z 2005-07-01T00:00:00.000+00:00 The datetime object's tzinfo will be set to splunk.util.utc Local server timezone: 2005-07-01T00:00:00.000 The datetime object's tzinfo will be set to splunk.util.localTZ @param {Boolean} strict Indicates if an exception should be thrown if 'timestamp' is not a valid ISO-8601 string ''' _compile_regexes() match = iso_re.search(timestamp) if match: year = int(match.group(1)) month = int(match.group(2)) day = int(match.group(3)) hour = int(match.group(4)) minute = int(match.group(5)) second = int(match.group(6)) msecond = 0 if match.group(8): numtext = match.group(8) msecond = int(numtext) # if not microseconds, multiply by power to get number of microseconds if len(numtext) < 6: msecond *= math.pow(10, 6-len(numtext)) msecond = int(msecond) # must be int tz = match.group(9) if tz in ('z', 'Z'): tzinfo = utc elif tz: tzinfo = TZInfo(parseISOOffset(tz), '') else: # set timezone as local server tz tzinfo = localTZ return datetime(year, month, day, hour, minute, second, msecond, tzinfo) else: if strict: raise ValueError('Cannot interpret value as ISO-8601: %s' % timestamp) else: return datetime(1, 1, 1) def parseISOOffset(offset): ''' Converts a string ISO-8601 timezone offset literal into minutes. ex: -0700 -07:00 +00:00 +10:26 ''' _compile_regexes() match = offset_re.search(offset) if match: dir = int('%s1' % match.group(1)) hours = int(match.group(2)) minutes = int(match.group(3)) return dir * ((hours * 60) + minutes) else: raise ValueError("Unknown time offset value: %s" % offset) def getISOTime(ts=None): ''' Returns an ISO-8601 formatted string that represents the timestamp. ts can be a time struct or datetime() object. If ts is a time.struct_time, then it is assumed to be in local time offset. If no value passed, then the current time is returned, in local time offset ''' if isinstance(ts, datetime): if ts.microsecond: output = ts.strftime(ISO_8601_STRFTIME_MSECOND) output = output.replace('{msec}', '.%03d' % int(ts.microsecond/1000.0)) return output else: return ts.strftime(ISO_8601_STRFTIME) elif isinstance(ts, time.struct_time): # first get offset of ts in local timezone offset = getTimeOffset(time.mktime(ts)) // 60 dt = datetime(ts[0], ts[1], ts[2], ts[3], ts[4], ts[5], 0, TZInfo(offset)) return dt.strftime(ISO_8601_STRFTIME) elif not ts: return datetime.now().strftime(ISO_8601_STRFTIME) else: raise ValueError('Unable to parse timestamp; not recognized as datetime object or time struct: %s' % ts) def mktimegm(tuple): """ UTC version of time.mktime() written by Guido van Rossum """ import calendar EPOCH = 1970 year, month, day, hour, minute, second = tuple[:6] assert 1 <= month <= 12 days = 365*(year-EPOCH) + calendar.leapdays(EPOCH, year) for i in range(1, month): days = days + calendar.mdays[i] if month > 2 and calendar.isleap(year): days = days + 1 days = days + day - 1 hours = days*24 + hour minutes = hours*60 + minute seconds = minutes*60 + second return seconds def dt2epoch(datetime): ''' Converts a datetime.datetime object into epoch time, with microsecond support ''' if datetime == None: raise ValueError('Cannot convert empty value') basetime = mktimegm(datetime.utctimetuple()) import decimal return decimal.Decimal('%s.%06d' % (basetime, datetime.microsecond)) def readSplunkFile(path): ''' Returns a file that exists inside $SPLUNK_HOME. All paths are homed at SPLUNK_HOME Ex: readSplunkFile('README.txt') ==> returns $SPLUNK_HOME/README.txt readSplunkFile('/README.txt') ==> returns $SPLUNK_HOME/README.txt readSplunkFile('etc/log.cfg') ==> returns $SPLUNK_HOME/etc/log.cfg TODO: this probably has some quirks in windows ''' home = os.environ['SPLUNK_HOME'] if not home or home == '/': raise Exception('readSplunkFile requires a SPLUNK_HOME to be set') workingPath = path.strip(os.sep) workingPath = os.path.join(home, workingPath) pathParts = os.path.split(workingPath) pathParts = [x for x in pathParts if x != os.pardir] finalPath = os.path.join(*pathParts) fh = open(os.path.abspath(finalPath), 'r') try: output = fh.readlines() return output finally: if fh: fh.close() class OrderedDict(UserDict, object): ''' Provides a dictionary that respects the order in which items were inserted. Upon iteration or pop, items will be returned in the original order. The OrderedDict can be populated on instantiation by passing a list of tuples, ex: OrderedDict([ ('name', 'Name'), ('userid', "User ID"), ('schedule', "Schedule"), ('lastrun', 'Last Run On'), ('nextrun', 'Next Run At'), ('enableSched', "Enabled") ]) ''' def __init__(self, dict = None): self._keys = [] if isinstance(dict, list): UserDict.__init__(self) for x in dict: self[x[0]] = x[1] else: UserDict.__init__(self, dict) def __delitem__(self, key): UserDict.__delitem__(self, key) self._keys.remove(key) def __setitem__(self, key, item): UserDict.__setitem__(self, key, item) if key not in self._keys: self._keys.append(key) def __iter__(self): return self._keys.__iter__() def iterKeys(self): return self._keys.__iter__() def __str__(self): o = [] for k in self: o.append("'%s': '%s'" % (k, self[k])) return '{' + ', '.join(o) + '}' def clear(self): UserDict.clear(self) self._keys = [] def copy(self): dict = UserDict.copy(self) dict._keys = self._keys[:] return dict def items(self): return zip(self._keys, list(self.values())) def keys(self): return self._keys def popitem(self, last=True): try: idx = -1 if not last: idx = 0 key = self._keys[idx] except IndexError: raise KeyError('dictionary is empty') val = self[key] del self[key] return (key, val) def setdefault(self, key, failobj = None): UserDict.setdefault(self, key, failobj) if key not in self._keys: self._keys.append(key) def update(self, dict): UserDict.update(self, dict) for key in dict.keys(): # keys() is required here to let the __init__() call complete and then let the __iter__() be called if key not in self._keys: self._keys.append(key) def values(self): return list(map(self.get, self._keys)) def urlencodeDict(query): ''' Convert a dictionary to a url-encoded" string. Multi-values keys can be assigned using a list (eg., {"foo": ["bar1", "bar2"]}. Note: None type values are removed. ''' qargs = [] [ qargs.extend([(k, e) for e in v]) for k,v in [ (k, v if isinstance(v, (list, tuple)) else (v,) ) for k, v in query.items() if v != None ] ] return '&'.join( [ '%s=%s' % ( safeURLQuote(unicode(k)),safeURLQuote(unicode(v)) ) for k,v in qargs ] ) def toUnicode(obj, decodeFrom='utf-8'): ''' Attempts to decode obj into a unicode object if obj is a str, otherwise simply returns obj. Primarily used as a helper function in toUnicode. ''' if sys.version_info >= (3, 0): if isinstance(obj, str): return obj elif isinstance(obj, (bytearray, bytes)): return obj.decode() if isinstance(obj, str) and not isinstance(obj, unicode): return unicode(obj, decodeFrom) elif '__str__' in dir(obj): return unicode(obj) return obj def toUTF8(obj, decodeFrom='utf-8', encodeTo='utf-8'): ''' Attempts to return a utf-8 encoded str object if obj is an instance of basestring, otherwise just returns obj. Can be used to safely print out high byte unicode characters. Example: # This assumes the string entered is input in utf-8 foo = u'Kivimäki2' parse.quote(splunk.util.toUTF8(foo)) ''' if sys.version_info >= (3, 0) and isinstance(obj, str): return obj.encode(encodeTo) if isinstance(obj, unicode): return obj.encode(encodeTo) elif isinstance(obj, str): return obj.decode(decodeFrom).encode(encodeTo) elif '__str__' in dir(obj): return toUTF8(unicode(obj)) return obj if sys.version_info >= (3, 0): toDefaultStrings = toUnicode else: toDefaultStrings = toUTF8 def objUnicode(obj, decodeFrom='utf-8', deep=True): ''' Ensures all strings passed in are returned as unicode. Can handle strings in lists, dicts and tuples. By default does a deep traversal to convert all strings to unicode. Example: toUnicode({'one': 'one', {'two': 2, 'three': u'three', 'four': 'four'}}) will return: {'one': u'one', {'two': 2, 'three': u'three', 'four', u'four'}} ''' mapFunc = objUnicode if not deep: mapFunc = toUnicode if isinstance(obj, str): return toUnicode(obj, decodeFrom) elif isinstance(obj, list) or isinstance(obj, tuple): out = [] if not deep: for item in obj: if not isinstance(item, str): out.append(item) else: out.append(mapFunc(item, decodeFrom)) return obj.__class__(out) else: return obj.__class__([mapFunc(item, decodeFrom) for item in obj]) elif isinstance(obj, dict) or isinstance(obj, UserDict): out = [] if not deep: for key, value in list(obj.items()): if not isinstance(value, str): out.append((key, value)) else: out.append((key, mapFunc(value, decodeFrom))) return obj.__class__(out) else: return obj.__class__([(key, mapFunc(value, decodeFrom)) for key, value in list(obj.items())]) return obj def safeURLQuote(string, safe='/', decodeFrom='utf-8', encodeFrom='utf-8'): ''' Safely encode high byte characters from unicode or some other encoding to UTF-8 url strings. For some reason parse.quote can't handle high byte unicode strings, although parse.unquote can unquote anything. Awesome. Always returns STR objects! ''' return parse.quote(toUTF8(string, decodeFrom, encodeFrom), safe) def safeURLQuotePlus(string, safe='', decodeFrom='utf-8', encodeFrom='utf-8'): ''' Safely encode high byte characters from unicode or other encodings to UTF-8 using the default HTML form encoding style where space is represented by a plus sign "+". ''' return parse.quote_plus(toUTF8(string, decodeFrom, encodeFrom), safe) def setSSLWrapProtocol(ssl_protocol_version): """ Sometimes we need to insist that outbound connections are made using SSL v3 rather than v2 or v3. parse, httplib and httplib2 provide no easy way to do this so this function monkey patches ssl.wrap_socket to change the default protocol """ import ssl def wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=ssl.CERT_NONE, ssl_version=ssl_protocol_version, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True): return ssl.SSLSocket(sock, keyfile=keyfile, certfile=certfile, server_side=server_side, cert_reqs=cert_reqs, ssl_version=ssl_version, ca_certs=ca_certs, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs) ssl.wrap_socket = wrap_socket def isRedirectSafe(url): ''' See if a string returns a network path location ''' if not url: return False urlDecode = parse.unquote(str(url)) if sys.version_info >= (3, 0): htmlDecode = html.unescape(urlDecode) else: htmlParser = HTMLParser.HTMLParser() htmlDecode = htmlParser.unescape(urlDecode) # Catch things like https:// http:// file:// o = parse.urlparse(htmlDecode.strip()) if o.scheme: return False if BAD_URL_PARAM.match(url) is not None: return False slash_prefix_cnt = 0 for c in url: if c in ('/', '\\'): slash_prefix_cnt += 1 if slash_prefix_cnt >= 2: return False elif c in ('\x09', '\x0b'): continue else: return True def sanitizeUrl(url): return url if isRedirectSafe(url) else '/' def sanitizeBreadcrumbs(breadcrumbs): ''' Given a set of breadcrumb tuples, determine if these have safe urls ''' if not isinstance(breadcrumbs, list): return [] for crumb in breadcrumbs: if isinstance(crumb, list) and len(crumb) > 1: if not isRedirectSafe(crumb[1]): # The crumb has an invalid url, make_url requires something valid, so we give a path to homepage crumb[1] = '/' return breadcrumbs def isValidUnsignedFloat(x): try: return float(x) >= 0 except ValueError: return False def parseByteSizeString(input_string, base=2): ''' Parses a string that identifies a byte size string. Input values can be numeric with a suffix of the forms: B, KB, MB, ..., YB (SI, binary) KiB, MiB, ..., YiB (IEC) Values that do not have a suffix are assumed to be of units 'B'. The 'base' parameter can be specified what base to use when converting the input_string down to bytes. This parameter is ignored if an IEC suffix is detected. Defaults to 2. USAGE >>> parseByteSizeString('16MB') == {'byte_value': 16777216.0, 'relative_value': 16.0, 'units': 'MB'} True ''' _compile_regexes() match = BYTE_PARSE_REX.search(input_string) # if input is unqualified, assume to be bytes if match == None: try: byte_value = float(input_string) except: raise ValueError('cannot parse byte size string: %s' % input_string) relative_value = byte_value units = 'B' # otherwise normalize as necessary else: relative_value = float(match.group(1)) units = match.group(2) if units.upper().find('I') == 1: base = 2 elif base not in (2, 10): raise ValueError('unsupported base: %s' % base) # define the mapping from value magnitude to friendly suffix prefix_map = { 'YIB': (80, 0), 'ZIB': (70, 0), 'EIB': (60, 0), 'PIB': (50, 0), 'TIB': (40, 0), 'GIB': (30, 0), 'MIB': (20, 0), 'KIB': (10, 0), 'YB': (80, 24), 'ZB': (70, 21), 'EB': (60, 18), 'PB': (50, 15), 'TB': (40, 12), 'GB': (30, 9), 'MB': (20, 6), 'KB': (10, 3), 'B': ( 0, 0) } map_index = 0 if base == 2 else 1 try: adjustment_exponent = prefix_map[units.upper()][map_index] except: raise ValueError('unknown size prefix: %s' % units) byte_value = (base ** adjustment_exponent) * relative_value return { 'byte_value': byte_value, 'relative_value': relative_value, 'units': units } def uuid4(): """ Wrapper around the uuid.uuid4() method that satisfies consumers who previously used our custom one. Please use the uuid module directly in any new code. """ import uuid return str(uuid.uuid4()) def splithost(hostport): """ Split a host:port string into a (host, port) tuple Correctly splits [host]:port IPv6 addresses port is set to None if not present in the string """ port = None if hostport.startswith('[') and hostport.find(']') > 0: host = hostport[1:hostport.find(']')] hostport = hostport[hostport.find(']') + 1:] if hostport.startswith(':'): port = int(hostport[1:]) else: hostport = hostport.split(':', 1) if len(hostport) > 1: host = hostport[0] port = hostport[1] else: host = hostport[0] return (host, port) def outsideSplunkHome(filePath): splunk_home = os.path.normcase(os.path.abspath(os.environ['SPLUNK_HOME'])) absFilePath = os.path.normcase(os.path.abspath(filePath)) if splunk_home != os.path.dirname( os.path.commonprefix([splunk_home + os.sep, absFilePath])): return True return False def ensureCerts(): """ if requireClientCert = false, return None/None; otherwise, ensure that the web.conf keyfile and certfile are present. If they are not, fall back to splunkweb fail-safe defaults, generating the certs if necessary (to help _http tests). """ import splunk.clilib.cli_common as comm from splunk.clilib.bundle_paths import make_splunkhome_path certfile = None keyfile = None # NOTE: use the cached merged instances of server/web.conf (in $SPLUNK_HOME/var/run/splunk/merged/) # they are regenerated everytime splunkweb is restarted. Spawning btool could take a long time !!! if normalizeBoolean(comm.getOptConfKeyValue('server', 'sslConfig', 'requireClientCert')): # SPL-227013: privKeyPath, serverCert defaults are $SPLUNK_HOME/etc/auth/splunkweb/.pem, # but spec says they can be relative to $SPLUNK_HOME so account for both splunk_home = os.environ['SPLUNK_HOME'] certfile = os.path.expandvars(comm.getWebConfKeyValue('serverCert', 'caCertPath')) if not os.path.isabs(certfile): certfile = os.path.join(splunk_home, certfile) keyfile = os.path.expandvars(comm.getWebConfKeyValue('privKeyPath')) if not os.path.isabs(keyfile): keyfile = os.path.join(splunk_home, keyfile) if outsideSplunkHome(certfile) or outsideSplunkHome(keyfile): raise ValueError( "serverCert, caCertPath, privKeyPath can not refer to paths outside $SPLUNK_HOME") if not (os.path.exists(keyfile) or os.path.exists(certfile)): safe_path = make_splunkhome_path(['etc', 'auth', 'splunkweb']) if not os.path.exists(safe_path): os.makedirs(safe_path, 0o700) certfile = os.path.join(safe_path, 'cert.pem') keyfile = os.path.join(safe_path, 'privkey.pem') if not (os.path.exists(keyfile) and os.path.exists(certfile)): import shutil for file in [certfile, keyfile]: if os.path.exists(file): # prevent completely nuking a good cert shutil.move(file, file + '.bak') splunk_cmd = 'splunk' if sys.platform.startswith('win'): splunk_cmd = 'splunk.exe' # windows requires the fully qualified path to splunk splunk_bin = os.path.join(splunk_home, 'bin', splunk_cmd) try: subprocess.call([splunk_bin, 'createssl', 'web-cert']) except Exception as ex: raise return (keyfile, certfile) STRING_INTERPOLATION_RE_STRING="\$([^$]*)\$" STRING_INTERPOLATION_RE=re.compile(STRING_INTERPOLATION_RE_STRING) def interpolateString(template, dictionary): """ template is of form: 'blah blah $token1$ blah $token2$' dictionary is {'token1': 'Hello', 'token2': 'World'} result is: 'blah blah Hello blah World """ result = template templateTokens = STRING_INTERPOLATION_RE.findall(template) for templateToken in templateTokens: if templateToken in dictionary: result = re.sub("\$%s\$" % templateToken, dictionary[templateToken], result) return result def pytest_mark_skip_conditional(reason=None, allow_module_level=False): """ We want to run from splunk.util import pytest_mark_skip_conditional and run mark.skip but only in dev builds. reason can be any description as to why the test is skipped If we fail to import it means that we are in a production build so we will just run the given function """ def pytest_mark_skip_decorator(func): try: import pytest func = pytest.mark.skipif(reason=reason, allow_module_level=allow_module_level)(func) return func except ImportError as error: return func return pytest_mark_skip_decorator def pytest_mark_skipif(*args, **kwargs): """Equivalent of pytest.mark.skipif, but safe to use for release builds.""" def pytest_mark_skipif_decorator(func): try: import pytest func = pytest.mark.skipif(*args, **kwargs)(func) return func except ImportError as error: return func return pytest_mark_skipif_decorator