#
# Copyright 2015 Palantir Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Logic for storing and calling jsonrpc methods."""
import inspect
import json
import six
import sys
import typedjsonrpc.parameter_checker as parameter_checker
import wrapt
from typedjsonrpc.errors import (Error, InternalError, InvalidRequestError, MethodNotFoundError,
ParseError)
from typedjsonrpc.method_info import MethodInfo, MethodSignature
from werkzeug.debug.tbtools import get_current_traceback
__all__ = ["Registry"]
[docs]class Registry(object):
"""The registry for storing and calling jsonrpc methods.
:attribute debug: Debug option which enables recording of tracebacks
:type debug: bool
:attribute tracebacks: Tracebacks for debugging
:type tracebacks: dict[int, werkzeug.debug.tbtools.Traceback]
.. versionadded:: 0.1.0
"""
json_encoder = json.JSONEncoder()
"""The JSON encoder to use. Defaults to :class:`json.JSONEncoder`
.. versionadded:: 0.1.0
.. versionchanged:: 0.2.0 Changed from class to instance
"""
json_decoder = json.JSONDecoder()
"""The JSON decoder to use. Defaults to :class:`json.JSONDecoder`
.. versionadded:: 0.1.0
.. versionchanged:: 0.2.0 Changed from class to instance
"""
[docs] def __init__(self, debug=False):
"""
:param debug: If True, the registry records tracebacks for debugging purposes
:type debug: bool
"""
self._name_to_method_info = {}
self._register_describe()
self.debug = debug
self.tracebacks = {}
def _register_describe(self):
def _describe():
return self.describe()
_describe.__doc__ = self.describe.__doc__
describe_signature = MethodSignature.create([], {}, dict)
self.register("rpc.describe", _describe, describe_signature)
[docs] def dispatch(self, request):
"""Takes a request and dispatches its data to a jsonrpc method.
:param request: a werkzeug request with json data
:type request: werkzeug.wrappers.Request
:return: json output of the corresponding method
:rtype: str
.. versionadded:: 0.1.0
"""
def _wrapped():
messages = self._get_request_messages(request)
results = [self._dispatch_and_handle_errors(message) for message in messages]
non_notification_results = [x for x in results if x is not None]
if len(non_notification_results) == 0:
return
elif len(messages) == 1:
return non_notification_results[0]
else:
return non_notification_results
result, _ = self._handle_exceptions(_wrapped)
if result is not None:
return self._encode_complete_result(result)
def _dispatch_and_handle_errors(self, msg):
is_notification = isinstance(msg, dict) and "id" not in msg
def _wrapped():
result = self._dispatch_message(msg)
if not is_notification:
return Registry._create_result_response(msg["id"], result)
result, _ = self._handle_exceptions(_wrapped, is_notification, self._get_id_if_known(msg))
return result
def _handle_exceptions(self, method, is_notification=False, msg_id=None):
try:
return method(), False
except Error as exc:
if not is_notification:
if self.debug:
debug_url = self._store_traceback()
exc.data = {"message": exc.data, "debug_url": debug_url}
return Registry._create_error_response(msg_id, exc), True
except Exception as exc: # pylint: disable=broad-except
if not is_notification:
exc_info = sys.exc_info()
if self.debug:
debug_url = self._store_traceback()
else:
debug_url = None
new_error = InternalError.from_error(exc_info, self.json_encoder, debug_url)
return Registry._create_error_response(msg_id, new_error), True
def _encode_complete_result(self, result):
if isinstance(result, list):
return '[' + ','.join([self._encode_single_result(res) for res in result]) + ']'
else:
return self._encode_single_result(result)
def _encode_single_result(self, result):
msg_id = Registry._get_id_if_known(result)
is_notification = msg_id is None
def _encode():
return self.json_encoder.encode(result)
encoded, is_error = self._handle_exceptions(_encode,
is_notification=is_notification,
msg_id=msg_id)
if is_error:
# Fall back to default because previous encoding didn't work.
return self.json_encoder.encode(encoded)
else:
return encoded
def _store_traceback(self):
traceback = get_current_traceback(skip=1,
show_hidden_frames=False,
ignore_system_exceptions=True)
self.tracebacks[traceback.id] = traceback
return "/debug/{}".format(traceback.id)
@staticmethod
def _get_id_if_known(msg):
if isinstance(msg, dict) and "id" in msg:
return msg["id"]
else:
return None
def _dispatch_message(self, msg):
self._check_request(msg)
method = self._name_to_method_info[msg["method"]].method
params = msg.get("params", [])
parameter_checker.validate_params_match(method, params)
if isinstance(params, list):
result = method(*params)
elif isinstance(params, dict):
result = method(**params)
else:
raise InvalidRequestError("Given params '{}' are neither a list nor a dict."
.format(msg["params"]))
return result
@staticmethod
def _create_result_response(msg_id, result):
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": result,
}
@staticmethod
def _create_error_response(msg_id, exc):
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": exc.as_error_object(),
}
[docs] def register(self, name, method, method_signature=None):
"""Registers a method with a given name and signature.
:param name: The name used to register the method
:type name: str
:param method: The method to register
:type method: function
:param method_signature: The method signature for the given function
:type method_signature: MethodSignature | None
.. versionadded:: 0.1.0
"""
if inspect.ismethod(method):
raise Exception("typedjsonrpc does not support making class methods into endpoints")
self._name_to_method_info[name] = MethodInfo(name, method, method_signature)
[docs] def method(self, returns, **parameter_types):
"""Syntactic sugar for registering a method
Example:
>>> registry = Registry()
>>> @registry.method(returns=int, x=int, y=int)
... def add(x, y):
... return x + y
:param returns: The method's return type
:type returns: type
:param parameter_types: The types of the method's parameters
:type parameter_types: dict[str, type]
.. versionadded:: 0.1.0
"""
@wrapt.decorator
def type_check_wrapper(method, instance, args, kwargs):
"""Wraps a method so that it is type-checked.
:param method: The method to wrap
:type method: (T) -> U
:return: The result of calling the method with the given parameters
:rtype: U
"""
if instance is not None:
raise Exception("Instance shouldn't be set.")
parameter_names = inspect.getargspec(method).args
defaults = inspect.getargspec(method).defaults
parameters = self._collect_parameters(parameter_names, args, kwargs, defaults)
parameter_checker.check_types(parameters, parameter_types)
result = method(*args, **kwargs)
parameter_checker.check_return_type(result, returns)
return result
def register_method(method):
"""Registers a method with its fully qualified name.
:param method: The method to register
:type method: function
:return: The original method wrapped into a type-checker
:rtype: function
"""
parameter_names = inspect.getargspec(method).args
parameter_checker.check_type_declaration(parameter_names, parameter_types)
wrapped_method = type_check_wrapper(method, None, None, None)
fully_qualified_name = "{}.{}".format(method.__module__, method.__name__)
self.register(fully_qualified_name, wrapped_method,
MethodSignature.create(parameter_names, parameter_types, returns))
return wrapped_method
return register_method
@staticmethod
def _collect_parameters(parameter_names, args, kwargs, defaults):
"""Creates a dictionary mapping parameters names to their values in the method call.
:param parameter_names: The method's parameter names
:type parameter_names: list[string]
:param args: *args passed into the method
:type args: list[object]
:param kwargs: **kwargs passed into the method
:type kwargs: dict[string, object]
:param defaults: The method's default values
:type defaults: list[object]
:return: Dictionary mapping parameter names to values
:rtype: dict[string, object]
"""
parameters = {}
if defaults is not None:
zipped_defaults = zip(reversed(parameter_names), reversed(defaults))
for name, default in zipped_defaults:
parameters[name] = default
for name, value in zip(parameter_names, args):
parameters[name] = value
for name, value in kwargs.items():
parameters[name] = value
return parameters
[docs] def describe(self):
"""Returns a description of all the methods in the registry.
:return: Description
:rtype: dict[str, object]
.. versionadded:: 0.1.0
"""
return {
"methods": [method_info.describe()
for method_info in sorted(self._name_to_method_info.values())]
}
def _get_request_messages(self, request):
"""Parses the request as a json message.
:param request: a werkzeug request with json data
:type request: werkzeug.wrappers.Request
:return: The parsed json object
:rtype: dict[str, object]
"""
data = request.get_data(as_text=True)
try:
msg = self.json_decoder.decode(data)
except Exception:
raise ParseError("Could not parse request data '{}'".format(data))
if isinstance(msg, list):
return msg
else:
return [msg]
def _check_request(self, msg):
"""Checks that the request json is well-formed.
:param msg: The request's json data
:type msg: dict[str, object]
"""
if "jsonrpc" not in msg:
raise InvalidRequestError("'\"jsonrpc\": \"2.0\"' must be included.")
if msg["jsonrpc"] != "2.0":
raise InvalidRequestError("'jsonrpc' must be exactly the string '2.0', but it was '{}'."
.format(msg["jsonrpc"]))
if "method" not in msg:
raise InvalidRequestError("No method specified.")
if "id" in msg:
if msg["id"] is None:
raise InvalidRequestError("typedjsonrpc does not allow id to be None.")
if isinstance(msg["id"], float):
raise InvalidRequestError("typedjsonrpc does not support float ids.")
if not isinstance(msg["id"], (six.string_types, six.integer_types)):
raise InvalidRequestError("id must be a string or integer; '{}' is of type {}."
.format(msg["id"], type(msg["id"])))
if msg["method"] not in self._name_to_method_info:
raise MethodNotFoundError("Could not find method '{}'.".format(msg["method"]))