You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
587 lines
20 KiB
587 lines
20 KiB
6 months ago
|
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License"). You
|
||
|
# may not use this file except in compliance with the License. A copy of
|
||
|
# the License is located at
|
||
|
#
|
||
|
# http://aws.amazon.com/apache2.0/
|
||
|
#
|
||
|
# or in the "license" file accompanying this file. This file is
|
||
|
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
|
||
|
# ANY KIND, either express or implied. See the License for the specific
|
||
|
# language governing permissions and limitations under the License.
|
||
|
import json
|
||
|
import logging
|
||
|
import re
|
||
|
import time
|
||
|
|
||
|
from botocore.compat import ensure_bytes, ensure_unicode, urlparse
|
||
|
from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class Monitor:
|
||
|
_EVENTS_TO_REGISTER = [
|
||
|
'before-parameter-build',
|
||
|
'request-created',
|
||
|
'response-received',
|
||
|
'after-call',
|
||
|
'after-call-error',
|
||
|
]
|
||
|
|
||
|
def __init__(self, adapter, publisher):
|
||
|
"""Abstraction for monitoring clients API calls
|
||
|
|
||
|
:param adapter: An adapter that takes event emitter events
|
||
|
and produces monitor events
|
||
|
|
||
|
:param publisher: A publisher for generated monitor events
|
||
|
"""
|
||
|
self._adapter = adapter
|
||
|
self._publisher = publisher
|
||
|
|
||
|
def register(self, event_emitter):
|
||
|
"""Register an event emitter to the monitor"""
|
||
|
for event_to_register in self._EVENTS_TO_REGISTER:
|
||
|
event_emitter.register_last(event_to_register, self.capture)
|
||
|
|
||
|
def capture(self, event_name, **payload):
|
||
|
"""Captures an incoming event from the event emitter
|
||
|
|
||
|
It will feed an event emitter event to the monitor's adaptor to create
|
||
|
a monitor event and then publish that event to the monitor's publisher.
|
||
|
"""
|
||
|
try:
|
||
|
monitor_event = self._adapter.feed(event_name, payload)
|
||
|
if monitor_event:
|
||
|
self._publisher.publish(monitor_event)
|
||
|
except Exception as e:
|
||
|
logger.debug(
|
||
|
'Exception %s raised by client monitor in handling event %s',
|
||
|
e,
|
||
|
event_name,
|
||
|
exc_info=True,
|
||
|
)
|
||
|
|
||
|
|
||
|
class MonitorEventAdapter:
|
||
|
def __init__(self, time=time.time):
|
||
|
"""Adapts event emitter events to produce monitor events
|
||
|
|
||
|
:type time: callable
|
||
|
:param time: A callable that produces the current time
|
||
|
"""
|
||
|
self._time = time
|
||
|
|
||
|
def feed(self, emitter_event_name, emitter_payload):
|
||
|
"""Feed an event emitter event to generate a monitor event
|
||
|
|
||
|
:type emitter_event_name: str
|
||
|
:param emitter_event_name: The name of the event emitted
|
||
|
|
||
|
:type emitter_payload: dict
|
||
|
:param emitter_payload: The payload to associated to the event
|
||
|
emitted
|
||
|
|
||
|
:rtype: BaseMonitorEvent
|
||
|
:returns: A monitor event based on the event emitter events
|
||
|
fired
|
||
|
"""
|
||
|
return self._get_handler(emitter_event_name)(**emitter_payload)
|
||
|
|
||
|
def _get_handler(self, event_name):
|
||
|
return getattr(
|
||
|
self, '_handle_' + event_name.split('.')[0].replace('-', '_')
|
||
|
)
|
||
|
|
||
|
def _handle_before_parameter_build(self, model, context, **kwargs):
|
||
|
context['current_api_call_event'] = APICallEvent(
|
||
|
service=model.service_model.service_id,
|
||
|
operation=model.wire_name,
|
||
|
timestamp=self._get_current_time(),
|
||
|
)
|
||
|
|
||
|
def _handle_request_created(self, request, **kwargs):
|
||
|
context = request.context
|
||
|
new_attempt_event = context[
|
||
|
'current_api_call_event'
|
||
|
].new_api_call_attempt(timestamp=self._get_current_time())
|
||
|
new_attempt_event.request_headers = request.headers
|
||
|
new_attempt_event.url = request.url
|
||
|
context['current_api_call_attempt_event'] = new_attempt_event
|
||
|
|
||
|
def _handle_response_received(
|
||
|
self, parsed_response, context, exception, **kwargs
|
||
|
):
|
||
|
attempt_event = context.pop('current_api_call_attempt_event')
|
||
|
attempt_event.latency = self._get_latency(attempt_event)
|
||
|
if parsed_response is not None:
|
||
|
attempt_event.http_status_code = parsed_response[
|
||
|
'ResponseMetadata'
|
||
|
]['HTTPStatusCode']
|
||
|
attempt_event.response_headers = parsed_response[
|
||
|
'ResponseMetadata'
|
||
|
]['HTTPHeaders']
|
||
|
attempt_event.parsed_error = parsed_response.get('Error')
|
||
|
else:
|
||
|
attempt_event.wire_exception = exception
|
||
|
return attempt_event
|
||
|
|
||
|
def _handle_after_call(self, context, parsed, **kwargs):
|
||
|
context['current_api_call_event'].retries_exceeded = parsed[
|
||
|
'ResponseMetadata'
|
||
|
].get('MaxAttemptsReached', False)
|
||
|
return self._complete_api_call(context)
|
||
|
|
||
|
def _handle_after_call_error(self, context, exception, **kwargs):
|
||
|
# If the after-call-error was emitted and the error being raised
|
||
|
# was a retryable connection error, then the retries must have exceeded
|
||
|
# for that exception as this event gets emitted **after** retries
|
||
|
# happen.
|
||
|
context[
|
||
|
'current_api_call_event'
|
||
|
].retries_exceeded = self._is_retryable_exception(exception)
|
||
|
return self._complete_api_call(context)
|
||
|
|
||
|
def _is_retryable_exception(self, exception):
|
||
|
return isinstance(
|
||
|
exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR'])
|
||
|
)
|
||
|
|
||
|
def _complete_api_call(self, context):
|
||
|
call_event = context.pop('current_api_call_event')
|
||
|
call_event.latency = self._get_latency(call_event)
|
||
|
return call_event
|
||
|
|
||
|
def _get_latency(self, event):
|
||
|
return self._get_current_time() - event.timestamp
|
||
|
|
||
|
def _get_current_time(self):
|
||
|
return int(self._time() * 1000)
|
||
|
|
||
|
|
||
|
class BaseMonitorEvent:
|
||
|
def __init__(self, service, operation, timestamp):
|
||
|
"""Base monitor event
|
||
|
|
||
|
:type service: str
|
||
|
:param service: A string identifying the service associated to
|
||
|
the event
|
||
|
|
||
|
:type operation: str
|
||
|
:param operation: A string identifying the operation of service
|
||
|
associated to the event
|
||
|
|
||
|
:type timestamp: int
|
||
|
:param timestamp: Epoch time in milliseconds from when the event began
|
||
|
"""
|
||
|
self.service = service
|
||
|
self.operation = operation
|
||
|
self.timestamp = timestamp
|
||
|
|
||
|
def __repr__(self):
|
||
|
return f'{self.__class__.__name__}({self.__dict__!r})'
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
if isinstance(other, self.__class__):
|
||
|
return self.__dict__ == other.__dict__
|
||
|
return False
|
||
|
|
||
|
|
||
|
class APICallEvent(BaseMonitorEvent):
|
||
|
def __init__(
|
||
|
self,
|
||
|
service,
|
||
|
operation,
|
||
|
timestamp,
|
||
|
latency=None,
|
||
|
attempts=None,
|
||
|
retries_exceeded=False,
|
||
|
):
|
||
|
"""Monitor event for a single API call
|
||
|
|
||
|
This event corresponds to a single client method call, which includes
|
||
|
every HTTP requests attempt made in order to complete the client call
|
||
|
|
||
|
:type service: str
|
||
|
:param service: A string identifying the service associated to
|
||
|
the event
|
||
|
|
||
|
:type operation: str
|
||
|
:param operation: A string identifying the operation of service
|
||
|
associated to the event
|
||
|
|
||
|
:type timestamp: int
|
||
|
:param timestamp: Epoch time in milliseconds from when the event began
|
||
|
|
||
|
:type latency: int
|
||
|
:param latency: The time in milliseconds to complete the client call
|
||
|
|
||
|
:type attempts: list
|
||
|
:param attempts: The list of APICallAttempts associated to the
|
||
|
APICall
|
||
|
|
||
|
:type retries_exceeded: bool
|
||
|
:param retries_exceeded: True if API call exceeded retries. False
|
||
|
otherwise
|
||
|
"""
|
||
|
super().__init__(
|
||
|
service=service, operation=operation, timestamp=timestamp
|
||
|
)
|
||
|
self.latency = latency
|
||
|
self.attempts = attempts
|
||
|
if attempts is None:
|
||
|
self.attempts = []
|
||
|
self.retries_exceeded = retries_exceeded
|
||
|
|
||
|
def new_api_call_attempt(self, timestamp):
|
||
|
"""Instantiates APICallAttemptEvent associated to the APICallEvent
|
||
|
|
||
|
:type timestamp: int
|
||
|
:param timestamp: Epoch time in milliseconds to associate to the
|
||
|
APICallAttemptEvent
|
||
|
"""
|
||
|
attempt_event = APICallAttemptEvent(
|
||
|
service=self.service, operation=self.operation, timestamp=timestamp
|
||
|
)
|
||
|
self.attempts.append(attempt_event)
|
||
|
return attempt_event
|
||
|
|
||
|
|
||
|
class APICallAttemptEvent(BaseMonitorEvent):
|
||
|
def __init__(
|
||
|
self,
|
||
|
service,
|
||
|
operation,
|
||
|
timestamp,
|
||
|
latency=None,
|
||
|
url=None,
|
||
|
http_status_code=None,
|
||
|
request_headers=None,
|
||
|
response_headers=None,
|
||
|
parsed_error=None,
|
||
|
wire_exception=None,
|
||
|
):
|
||
|
"""Monitor event for a single API call attempt
|
||
|
|
||
|
This event corresponds to a single HTTP request attempt in completing
|
||
|
the entire client method call.
|
||
|
|
||
|
:type service: str
|
||
|
:param service: A string identifying the service associated to
|
||
|
the event
|
||
|
|
||
|
:type operation: str
|
||
|
:param operation: A string identifying the operation of service
|
||
|
associated to the event
|
||
|
|
||
|
:type timestamp: int
|
||
|
:param timestamp: Epoch time in milliseconds from when the HTTP request
|
||
|
started
|
||
|
|
||
|
:type latency: int
|
||
|
:param latency: The time in milliseconds to complete the HTTP request
|
||
|
whether it succeeded or failed
|
||
|
|
||
|
:type url: str
|
||
|
:param url: The URL the attempt was sent to
|
||
|
|
||
|
:type http_status_code: int
|
||
|
:param http_status_code: The HTTP status code of the HTTP response
|
||
|
if there was a response
|
||
|
|
||
|
:type request_headers: dict
|
||
|
:param request_headers: The HTTP headers sent in making the HTTP
|
||
|
request
|
||
|
|
||
|
:type response_headers: dict
|
||
|
:param response_headers: The HTTP headers returned in the HTTP response
|
||
|
if there was a response
|
||
|
|
||
|
:type parsed_error: dict
|
||
|
:param parsed_error: The error parsed if the service returned an
|
||
|
error back
|
||
|
|
||
|
:type wire_exception: Exception
|
||
|
:param wire_exception: The exception raised in sending the HTTP
|
||
|
request (i.e. ConnectionError)
|
||
|
"""
|
||
|
super().__init__(
|
||
|
service=service, operation=operation, timestamp=timestamp
|
||
|
)
|
||
|
self.latency = latency
|
||
|
self.url = url
|
||
|
self.http_status_code = http_status_code
|
||
|
self.request_headers = request_headers
|
||
|
self.response_headers = response_headers
|
||
|
self.parsed_error = parsed_error
|
||
|
self.wire_exception = wire_exception
|
||
|
|
||
|
|
||
|
class CSMSerializer:
|
||
|
_MAX_CLIENT_ID_LENGTH = 255
|
||
|
_MAX_EXCEPTION_CLASS_LENGTH = 128
|
||
|
_MAX_ERROR_CODE_LENGTH = 128
|
||
|
_MAX_USER_AGENT_LENGTH = 256
|
||
|
_MAX_MESSAGE_LENGTH = 512
|
||
|
_RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
|
||
|
'x-amzn-requestid': 'XAmznRequestId',
|
||
|
'x-amz-request-id': 'XAmzRequestId',
|
||
|
'x-amz-id-2': 'XAmzId2',
|
||
|
}
|
||
|
_AUTH_REGEXS = {
|
||
|
'v4': re.compile(
|
||
|
r'AWS4-HMAC-SHA256 '
|
||
|
r'Credential=(?P<access_key>\w+)/\d+/'
|
||
|
r'(?P<signing_region>[a-z0-9-]+)/'
|
||
|
),
|
||
|
's3': re.compile(r'AWS (?P<access_key>\w+):'),
|
||
|
}
|
||
|
_SERIALIZEABLE_EVENT_PROPERTIES = [
|
||
|
'service',
|
||
|
'operation',
|
||
|
'timestamp',
|
||
|
'attempts',
|
||
|
'latency',
|
||
|
'retries_exceeded',
|
||
|
'url',
|
||
|
'request_headers',
|
||
|
'http_status_code',
|
||
|
'response_headers',
|
||
|
'parsed_error',
|
||
|
'wire_exception',
|
||
|
]
|
||
|
|
||
|
def __init__(self, csm_client_id):
|
||
|
"""Serializes monitor events to CSM (Client Side Monitoring) format
|
||
|
|
||
|
:type csm_client_id: str
|
||
|
:param csm_client_id: The application identifier to associate
|
||
|
to the serialized events
|
||
|
"""
|
||
|
self._validate_client_id(csm_client_id)
|
||
|
self.csm_client_id = csm_client_id
|
||
|
|
||
|
def _validate_client_id(self, csm_client_id):
|
||
|
if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
|
||
|
raise ValueError(
|
||
|
f'The value provided for csm_client_id: {csm_client_id} exceeds '
|
||
|
f'the maximum length of {self._MAX_CLIENT_ID_LENGTH} characters'
|
||
|
)
|
||
|
|
||
|
def serialize(self, event):
|
||
|
"""Serializes a monitor event to the CSM format
|
||
|
|
||
|
:type event: BaseMonitorEvent
|
||
|
:param event: The event to serialize to bytes
|
||
|
|
||
|
:rtype: bytes
|
||
|
:returns: The CSM serialized form of the event
|
||
|
"""
|
||
|
event_dict = self._get_base_event_dict(event)
|
||
|
event_type = self._get_event_type(event)
|
||
|
event_dict['Type'] = event_type
|
||
|
for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
|
||
|
value = getattr(event, attr, None)
|
||
|
if value is not None:
|
||
|
getattr(self, '_serialize_' + attr)(
|
||
|
value, event_dict, event_type=event_type
|
||
|
)
|
||
|
return ensure_bytes(json.dumps(event_dict, separators=(',', ':')))
|
||
|
|
||
|
def _get_base_event_dict(self, event):
|
||
|
return {
|
||
|
'Version': 1,
|
||
|
'ClientId': self.csm_client_id,
|
||
|
}
|
||
|
|
||
|
def _serialize_service(self, service, event_dict, **kwargs):
|
||
|
event_dict['Service'] = service
|
||
|
|
||
|
def _serialize_operation(self, operation, event_dict, **kwargs):
|
||
|
event_dict['Api'] = operation
|
||
|
|
||
|
def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
|
||
|
event_dict['Timestamp'] = timestamp
|
||
|
|
||
|
def _serialize_attempts(self, attempts, event_dict, **kwargs):
|
||
|
event_dict['AttemptCount'] = len(attempts)
|
||
|
if attempts:
|
||
|
self._add_fields_from_last_attempt(event_dict, attempts[-1])
|
||
|
|
||
|
def _add_fields_from_last_attempt(self, event_dict, last_attempt):
|
||
|
if last_attempt.request_headers:
|
||
|
# It does not matter which attempt to use to grab the region
|
||
|
# for the ApiCall event, but SDKs typically do the last one.
|
||
|
region = self._get_region(last_attempt.request_headers)
|
||
|
if region is not None:
|
||
|
event_dict['Region'] = region
|
||
|
event_dict['UserAgent'] = self._get_user_agent(
|
||
|
last_attempt.request_headers
|
||
|
)
|
||
|
if last_attempt.http_status_code is not None:
|
||
|
event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
|
||
|
if last_attempt.parsed_error is not None:
|
||
|
self._serialize_parsed_error(
|
||
|
last_attempt.parsed_error, event_dict, 'ApiCall'
|
||
|
)
|
||
|
if last_attempt.wire_exception is not None:
|
||
|
self._serialize_wire_exception(
|
||
|
last_attempt.wire_exception, event_dict, 'ApiCall'
|
||
|
)
|
||
|
|
||
|
def _serialize_latency(self, latency, event_dict, event_type):
|
||
|
if event_type == 'ApiCall':
|
||
|
event_dict['Latency'] = latency
|
||
|
elif event_type == 'ApiCallAttempt':
|
||
|
event_dict['AttemptLatency'] = latency
|
||
|
|
||
|
def _serialize_retries_exceeded(
|
||
|
self, retries_exceeded, event_dict, **kwargs
|
||
|
):
|
||
|
event_dict['MaxRetriesExceeded'] = 1 if retries_exceeded else 0
|
||
|
|
||
|
def _serialize_url(self, url, event_dict, **kwargs):
|
||
|
event_dict['Fqdn'] = urlparse(url).netloc
|
||
|
|
||
|
def _serialize_request_headers(
|
||
|
self, request_headers, event_dict, **kwargs
|
||
|
):
|
||
|
event_dict['UserAgent'] = self._get_user_agent(request_headers)
|
||
|
if self._is_signed(request_headers):
|
||
|
event_dict['AccessKey'] = self._get_access_key(request_headers)
|
||
|
region = self._get_region(request_headers)
|
||
|
if region is not None:
|
||
|
event_dict['Region'] = region
|
||
|
if 'X-Amz-Security-Token' in request_headers:
|
||
|
event_dict['SessionToken'] = request_headers[
|
||
|
'X-Amz-Security-Token'
|
||
|
]
|
||
|
|
||
|
def _serialize_http_status_code(
|
||
|
self, http_status_code, event_dict, **kwargs
|
||
|
):
|
||
|
event_dict['HttpStatusCode'] = http_status_code
|
||
|
|
||
|
def _serialize_response_headers(
|
||
|
self, response_headers, event_dict, **kwargs
|
||
|
):
|
||
|
for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
|
||
|
if header in response_headers:
|
||
|
event_dict[entry] = response_headers[header]
|
||
|
|
||
|
def _serialize_parsed_error(
|
||
|
self, parsed_error, event_dict, event_type, **kwargs
|
||
|
):
|
||
|
field_prefix = 'Final' if event_type == 'ApiCall' else ''
|
||
|
event_dict[field_prefix + 'AwsException'] = self._truncate(
|
||
|
parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH
|
||
|
)
|
||
|
event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
|
||
|
parsed_error['Message'], self._MAX_MESSAGE_LENGTH
|
||
|
)
|
||
|
|
||
|
def _serialize_wire_exception(
|
||
|
self, wire_exception, event_dict, event_type, **kwargs
|
||
|
):
|
||
|
field_prefix = 'Final' if event_type == 'ApiCall' else ''
|
||
|
event_dict[field_prefix + 'SdkException'] = self._truncate(
|
||
|
wire_exception.__class__.__name__, self._MAX_EXCEPTION_CLASS_LENGTH
|
||
|
)
|
||
|
event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
|
||
|
str(wire_exception), self._MAX_MESSAGE_LENGTH
|
||
|
)
|
||
|
|
||
|
def _get_event_type(self, event):
|
||
|
if isinstance(event, APICallEvent):
|
||
|
return 'ApiCall'
|
||
|
elif isinstance(event, APICallAttemptEvent):
|
||
|
return 'ApiCallAttempt'
|
||
|
|
||
|
def _get_access_key(self, request_headers):
|
||
|
auth_val = self._get_auth_value(request_headers)
|
||
|
_, auth_match = self._get_auth_match(auth_val)
|
||
|
return auth_match.group('access_key')
|
||
|
|
||
|
def _get_region(self, request_headers):
|
||
|
if not self._is_signed(request_headers):
|
||
|
return None
|
||
|
auth_val = self._get_auth_value(request_headers)
|
||
|
signature_version, auth_match = self._get_auth_match(auth_val)
|
||
|
if signature_version != 'v4':
|
||
|
return None
|
||
|
return auth_match.group('signing_region')
|
||
|
|
||
|
def _get_user_agent(self, request_headers):
|
||
|
return self._truncate(
|
||
|
ensure_unicode(request_headers.get('User-Agent', '')),
|
||
|
self._MAX_USER_AGENT_LENGTH,
|
||
|
)
|
||
|
|
||
|
def _is_signed(self, request_headers):
|
||
|
return 'Authorization' in request_headers
|
||
|
|
||
|
def _get_auth_value(self, request_headers):
|
||
|
return ensure_unicode(request_headers['Authorization'])
|
||
|
|
||
|
def _get_auth_match(self, auth_val):
|
||
|
for signature_version, regex in self._AUTH_REGEXS.items():
|
||
|
match = regex.match(auth_val)
|
||
|
if match:
|
||
|
return signature_version, match
|
||
|
return None, None
|
||
|
|
||
|
def _truncate(self, text, max_length):
|
||
|
if len(text) > max_length:
|
||
|
logger.debug(
|
||
|
'Truncating following value to maximum length of ' '%s: %s',
|
||
|
text,
|
||
|
max_length,
|
||
|
)
|
||
|
return text[:max_length]
|
||
|
return text
|
||
|
|
||
|
|
||
|
class SocketPublisher:
|
||
|
_MAX_MONITOR_EVENT_LENGTH = 8 * 1024
|
||
|
|
||
|
def __init__(self, socket, host, port, serializer):
|
||
|
"""Publishes monitor events to a socket
|
||
|
|
||
|
:type socket: socket.socket
|
||
|
:param socket: The socket object to use to publish events
|
||
|
|
||
|
:type host: string
|
||
|
:param host: The host to send events to
|
||
|
|
||
|
:type port: integer
|
||
|
:param port: The port on the host to send events to
|
||
|
|
||
|
:param serializer: The serializer to use to serialize the event
|
||
|
to a form that can be published to the socket. This must
|
||
|
have a `serialize()` method that accepts a monitor event
|
||
|
and return bytes
|
||
|
"""
|
||
|
self._socket = socket
|
||
|
self._address = (host, port)
|
||
|
self._serializer = serializer
|
||
|
|
||
|
def publish(self, event):
|
||
|
"""Publishes a specified monitor event
|
||
|
|
||
|
:type event: BaseMonitorEvent
|
||
|
:param event: The monitor event to be sent
|
||
|
over the publisher's socket to the desired address.
|
||
|
"""
|
||
|
serialized_event = self._serializer.serialize(event)
|
||
|
if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
|
||
|
logger.debug(
|
||
|
'Serialized event of size %s exceeds the maximum length '
|
||
|
'allowed: %s. Not sending event to socket.',
|
||
|
len(serialized_event),
|
||
|
self._MAX_MONITOR_EVENT_LENGTH,
|
||
|
)
|
||
|
return
|
||
|
self._socket.sendto(serialized_event, self._address)
|