You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
14 KiB

9 months ago
from commonAuth import *
logger = getLogger("{}/splunk_scripted_authentication_okta.log".format(logPath), "okta")
if sys.version_info < (3,0):
logger.error("Python 2 has been deprecated. Use Python 3 to execute this script instead.")
import requests
import json
from urllib.parse import quote
# This is for getting SAML user information, it is an alternative to using SAML attribute
# query requests (AQR) which Okta does not support.
# Provide Okta API key credentials and base url in the authentication.conf
# file or using the Splunk Web UI
# (Settings > Authentication Methods > SAML Configuration > Authentication Extensions)
# and use the Okta API to extract user information.
# In authentication.conf, configure the 'scriptSecureArguments' setting to
# "apiKey:<your Okta API key>" and "baseUrl:<your Okta url>. For example:
# scriptSecureArguments = apiKey:<your Okta API key string>,baseUrl:<your Okta url>
# After you restart the Splunk platform, the platform encrypts your Okta credentials.
# For more information about Splunk platform configuration files, search the
# Splunk documentation for "about configuration files".
# In Splunk Web UI under Authentication Extensions > Script Secure Arguments:
# key = apiKey, value = <your Okta API key string>
# key = baseUrl, value =<your Okta url>
request_timeout = 10
errMsg = ""
def getUserInfo(args):
username = args['username']
if not username:
errMsg = "Username is empty. Not executing API call"
return FAILED + " " + ERROR_MSG + errMsg"Running getUserInfo() for username={}".format(username))
# Extracting base url and API key from authentication.conf under scriptSecureArguments
BASE_URL = args['baseUrl']
API_KEY = args['apiKey']
# create persistent connection
session = requests.Session()
session.headers = {'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': API_KEY_HEADER}
encoded_username = quote(username)
if OKTA_USER_SEARCH_INPUT not in args:
# By default use the email as the attribute to query user information from Okta.
# Typically Okta APIs can be quieried directly using the email attribute.
# For example, for a customer Acme and username "" the Okta
# URL will look something like
usernameUrl = BASE_URL + '/api/v1/users/' + encoded_username
groupsUrl = usernameUrl + '/groups'"Okta username url is {}".format(usernameUrl))
usernameResponse = session.get(usernameUrl, timeout=request_timeout)
if usernameResponse.status_code != 200:
errMsg = "Failed to get user info for username={} with status={} and response={}".format(username, usernameResponse.status_code, usernameResponse.text)
if usernameResponse.status_code == 401:
errMsg = "It appears your baseUrl and/or apiKey are incorrect. Check your Okta instance URL " \
"and search the Okta documentation to retrieve the apiKey: " \
"\"Create the token | Okta Developer\""
elif usernameResponse.status_code == 404:
errMsg = "User not found. The user you are querying (username={}) does not exist".format(username)
return FAILED + " " + ERROR_MSG + errMsg
nameAttributes = json.loads(usernameResponse.text)
except Exception as e:
errMsg = "Failed to parse user info for username={} with error={}".format(username, str(e))
return FAILED + " " + ERROR_MSG + errMsg
if 'status' not in nameAttributes:
errMsg = "Failed to parse user info for username={}, 'status' not present in response output: {}".format(username, usernameResponse.text)
return FAILED + " " + ERROR_MSG + errMsg
status = nameAttributes['status']
# In rare cases (like when Okta has been paired with a customer's Active Directory) the email may *not*
# used directly to lookup user information. In such cases an AD attribute e.g (samAccountName) is needed.
# More info
# In such cases we allow the customer to construct a search based on whatever attribute they have choosen.
# Okta's user APIs are queried by construncting a search with the unique user identifier passed in as a
# argument to the script. This can be done directly through the SAML configuration page or
# through authentication.conf
# if the user has passed in a custom search attribute, use that instead of the email.
# API Ref:
# Note that this search attribute is passed in as a key:value pair through the scripted inputs section.
# E.g if we want to search based on 'samAccountName' we will pass in the following input to the script
# search=profile.samAccountName eq <attr-to-be-queried>
# Currently, only one attribute is allowed as an input to search.
#<Base64UrlEncode(search profile.samAccountName eq <username>)>'Using attribute={} to do a lookup for value={}'.format(args[OKTA_USER_SEARCH_INPUT], encoded_username))
query = '{} eq \"{}\"'.format(args[OKTA_USER_SEARCH_INPUT], username)
searchUrl = '/api/v1/users/?search=' + quote(query)
usernameUrl = BASE_URL + searchUrl"Okta search url is {}".format(usernameUrl))
usernameResponse = session.get(usernameUrl, timeout=request_timeout)
if usernameResponse.status_code != 200:
errMsg = "Failed to get user info for username={} with status={} and response={}".format(username, usernameResponse.status_code, usernameResponse.text)
if usernameResponse.status_code == 400:
errMsg = "It appears you are using a search parameter that does not exist. " \
"Search the Okta documentation for examples: " \
"\"Okta Users API - Okta Developer\" / \"List Users with Search\""
elif usernameResponse.status_code == 401:
errMsg = "It appears your baseUrl and/or apiKey are incorrect. Check your Okta instance URL " \
"and search the Okta documentation to retrieve the apiKey: " \
"\"Create the token | Okta Developer\""
elif usernameResponse.status_code == 404:
errMsg = "User not found. The user you are querying ({}) does not exist".format(username)
return FAILED + " " + ERROR_MSG + errMsg
nameAttributes = json.loads(usernameResponse.text)
except Exception as e:
errMsg = "Failed to parse user info for username={} with error={}".format(username, str(e))
return FAILED + " " + ERROR_MSG + errMsg
if not len(nameAttributes):
errMsg = "Search query returned an empty response using attribute={} to do a lookup for value={}".format(args[OKTA_USER_SEARCH_INPUT], encoded_username)
return FAILED + " " + ERROR_MSG + errMsg
if len(nameAttributes) > 1:
logger.error("Returned more than one result while fetching get user info for username={} with user response status={} and user response={}. Check your search criteria.".format(username, usernameResponse.status_code, usernameResponse.text))
errMsg = "Returned more than one result while fetching get user info for username={}. " \
"Check your search criteria.".format(username)
return FAILED + " " + ERROR_MSG + errMsg
userId = nameAttributes[0]['id']
groupsUrl = BASE_URL + '/api/v1/users/' + userId + '/groups'
nameAttributes = json.loads(usernameResponse.text)[0]
except Exception as e:
errMsg = "Failed to parse user info for username={} with error={}".format(username, str(e))
return FAILED + " " + ERROR_MSG + errMsg
if 'status' not in nameAttributes:
errMsg = "Failed to parse user info for username={}, status not present in response output".format(username)
return FAILED + " " + ERROR_MSG + errMsg
status = nameAttributes['status']
roleString = ''
realNameString = ''
fullString = ''
emailString = ''
if usernameResponse.status_code == 429:
logger.error("Rate limit reached for IdP, failed to get user info for username={} with user "
"response status={} and user response={}".format(username, usernameResponse.status_code, usernameResponse.text))
errMsg = "Rate limit reached for IdP, failed to get user info for username={} " \
"with user response status={}".format(username, usernameResponse.status_code)
return FAILED + " " + ERROR_MSG + errMsg
if usernameResponse.status_code != 200:
logger.error("Failed to get user info for username={} with user response status={} and user "
"response={}".format(username, usernameResponse.status_code, usernameResponse.text))
errMsg = "Failed to get user info for username={} " \
"with user response status={}".format(username, usernameResponse.status_code)
return FAILED + " " + ERROR_MSG + errMsg
profile = nameAttributes['profile']
if profile:
displayName, email, login = profile.get("displayName"), profile.get("email"), profile.get("login")"Successfully obtained user info for username={} with user response status={} and user "
"response: displayName={} email={} login={}".format(username, usernameResponse.status_code, displayName, email, login))
# Available statuses : Staged, Pending User Action, Active, Password Reset, Locked Out, Suspended, Deactivated
errMsg = "User is not active in IdP for username={} with user status={}".format(username, status)
return FAILED + " " + ERROR_MSG + errMsg
realNameString += nameAttributes['profile']['firstName'] + ' ' + nameAttributes['profile']['lastName']
emailString += nameAttributes['profile']['email']
encodeOutput = True # default to always encode unless specified in args
if 'encodeOutput' in args and args['encodeOutput'].lower() == 'false':
encodeOutput = False
while groupsUrl:"Okta group url is {}".format(groupsUrl))
groupsResponse = session.get(groupsUrl, timeout=request_timeout)
if groupsResponse.status_code == 429:
logger.error("Rate limit reached for IdP, failed to get group info for username={} with group "
"response status={} and group response={}".format(username, groupsResponse.status_code, groupsResponse.text))
errMsg = "Rate limit reached for IdP, failed to get group info for username={} " \
"with group response status={}".format(username, groupsResponse.status_code)
return FAILED + " " + ERROR_MSG + errMsg
if groupsResponse.status_code != 200:
logger.error("Failed to get group info for username={} with group response status={} and group "
"response={}".format(username, groupsResponse.status_code, groupsResponse.text))
errMsg = "Failed to get group info for username={} " \
"with group response status={}".format(username, groupsResponse.status_code)
return FAILED + " " + ERROR_MSG + errMsg
groupAttributes = json.loads(groupsResponse.text)
except Exception as e:
errMsg = "Failed to parse group info for username={} with status={} and response={}".format(username, groupsResponse.status_code, groupsResponse.text)
return FAILED + " " + ERROR_MSG + errMsg"Successfully obtained group names for username={} groups={}".format(username, [group['profile']['name'] for group in groupAttributes]))
groupNames = ['{}'.format(urlsafe_b64encode_to_str(group['profile']['name'])) for group in groupAttributes] if encodeOutput else ['{}'.format(group['profile']['name']) for group in groupAttributes]
roleString += ":".join(groupNames)
if groupsResponse.links.get('next'):
groupsUrl = groupsResponse.links['next']['url']
groupsUrl = None"Successfully obtained group info for username={}".format(username))
if encodeOutput:"base64 encoding script output")
base64UrlEncodedUsername = urlsafe_b64encode_to_str(username)
base64UrlEncodedRealName = urlsafe_b64encode_to_str(realNameString)
base64UrlEncodedEmail = urlsafe_b64encode_to_str(emailString)
fullString += '{} --userInfo={};{};{};{} --encodedOutput=true'.format(SUCCESS, base64UrlEncodedUsername, base64UrlEncodedRealName, roleString, base64UrlEncodedEmail)
else:"Not base64 encoding script output")
fullString += '{} --userInfo={};{};{};{}'.format(SUCCESS, username, realNameString, roleString, emailString)"getUserInfo() successful for username={}".format(username))
return fullString
if __name__ == "__main__":
callName = sys.argv[1]
dictIn = readInputs()
if callName == "getUserInfo":
response = getUserInfo(dictIn)

Powered by BW's shoe-string budget.