Exception raised when error message is received from service
Exception raised when u try to request chunks from closed request
Event loop wrapper
Timer wrapper
Represents abstract cocaine service.
It provides basic service operations like getting its actual network address, determining if the service is connecting or connected.
There is no other useful public methods, so the main aim of this class - is to provide superclass for inheriting for actual services or service-like objects (i.e. Locator).
Return actual network address (sockaddr) of the current service if it is connected.
Returned sockaddr is a tuple describing a socket address, whose format depends on the returned family (address, port) 2-tuple for AF_INET, or (address, port, flow info, scope id) 4-tuple for AF_INET6), and is meant to be passed to the socket.connect() method.
It the service is not connected this method returns tuple (‘NOT_CONNECTED’, 0).
Disconnect service from its endpoint and destroys all communications between them.
Note
This method does nothing if the service is not connected.
Return true if the service is in connected state.
Return true if the service is in connecting state.
Performs synchronous method invocation via direct socket usage without the participation of the event loop.
Returns generator of chunks.
Parameters: |
|
---|
Note
Left for backward compatibility, tests and other stuff. Indiscriminate using of this method can lead to the summoning of Satan.
Warning
Do not mix synchronous and asynchronous usage of service!
Represents locator service.
Locator is the special service which can resolve other services in the cloud by name.
Note
Normally, you shouldn’t use this class directly - it is using behind the scene for resolving other services endpoints.
Connects to the locator at specified host and port.
The locator itself always runs on a well-known host and port.
Parameters: |
|
---|
Resolve service by its name.
Parameters: |
|
---|
Represents cocaine services or applications and provides API to communicate with them.
This is the main class you will use to manage cocaine services in python. Let’s start with the simple example:
>>> from cocaine.services import Service
>>> node = Service('node')
We just created node service object by passing its name to the cocaine.services.Service initialization method. If no errors occurred, you can use it right now.
If the service is not available, you will see something like that:
>>> from cocaine.services import Service
>>> node = Service('WAT?')
Traceback (most recent call last):
...
cocaine.exceptions.ServiceError: error in service "locator" - the specified service is not available [1]
Behind the scene it has synchronously connected to the locator, resolved service’s API and connected to the service’s endpoint obtained by resolving. This is the normal usage of services.
If you don’t want immediate blocking service initialization, you can set blockingConnect argument to False and then to connect manually:
>>> from cocaine.services import Service
>>> node = Service('node', blockingConnect=False)
>>> node.connect()
You can also specify locator’s address by passing host and port parameters like this:
>>> from cocaine.services import Service
>>> node = Service('node', host='localhost', port=666)
Note
If you refused service connection-at-initialization, you shouldn’t pass locator endpoint information, because this is mutual exclusive information. Specify them later when connect while method invoking.
Note
If you don’t want to create connection to the locator each time you create service, you can use connectThroughLocator method, which is specially designed for that cases.
Note
Actual service’s API is building dynamically. Sorry, IDE users, there is no autocompletion :(
Represents yieldable object for asynchronous future grouping.
This class provides ability to yield multiple yieldable objects in chain context. Program control returns after all of them completed. Future results will be placed in the list in original order.
Typical usage:
from cocaine.services import Service
from cocaine.futures import chain
@chain.source
def func():
r1, r2 = yield chain.All([s1.execute(), s2.execute()])
print(r1, r2)
s1 = Service('s1')
s2 = Service('s2')
func()
If you have specified deferred, you can invoke execute method and pass that deferred to it. This will have the same effect as yielding.
Note
You can yield this class’s objects only in chain context and only once. Think about this class as some kind of single-shot.
Note
All methods in this class are thread-safe.
Executes asynchronous grouped future invocation and binds deferred to the completion event.
Parameters: | deferred – deferred, which will be invoked after all of futures are completed. |
---|
Represents pipeline of processing functions over chunks.
This class represents chain of processing functions over incoming chunks. It manages creating chunk pipeline by binding them one-by-one. Incoming chunks will be processed separately in direct order. If some of processing function fails and raise an exception, it will be transported to the next chain item over and over again until it will be caught by except block or transferred to the event loop exception trap.
There is also synchronous API provided, but it should be used only for scripting or tests.
Returns next result of chaining execution. If chain haven’t been completed after timeout seconds, an TimeoutError will be raised.
Default implementation simply starts event loop, sets timeout condition and run chain expression. Event loop will be stopped after getting chain result or after timeout expired. It is correct to call this method multiple times to receive multiple chain results until you exactly know how much chunks there will be. A ChokeEvent will be raised if there is no more chunks to process.
Warning
This is synchronous usage of chain object. Do not mix asynchronous and synchronous chain usage!
Parameters: | timeout – Timeout in seconds after which TimeoutError will be raised. If timeout is not set (default) it means forever waiting. |
---|---|
Raises: |
|
Provides information if chain object has pending result that can be taken from it.
Gets next chain result. Normally, you should not use this method directly - python uses it automatically in the for loop.
Warning
This is synchronous usage of chain object. Do not mix asynchronous and synchronous chain usage!
Puts specified chunk processing function into chain pipeline.
With this method, you can create a pipeline of several chunk handlers. Chunks will be processed asynchronously, transported after that to the next chain item. If some error occurred in the middle of chain and no one caught it, it will be redirected next over pipeline, so make sure you catch all exceptions you need and correctly process it.
Parameters: | func – chunk processing function or method. Its signature must have one parameter of class FutureResult if specified function is not the chunk source. If function is chunk source (i.e. service execution method) than there is no parameters must be provided in function signature. |
---|
Waits chaining execution during some time or forever.
This method provides you nice way to do asynchronous waiting future result from chain expression. Default implementation simply starts event loop, sets timeout condition and run chain expression. Event loop will be stopped after getting final chain result or after timeout expired. Unlike get method there will be no exception raised if timeout is occurred while chaining execution running.
Warning
This is synchronous usage of chain object. Do not mix asynchronous and synchronous chain usage!
Parameters: | timeout – Timeout in seconds after which event loop will be stopped. If timeout is not set (default) it means forever waiting. |
---|---|
Raises ValueError: | |
If timeout is set and it is less than 1 ms. |
Deferred future result.
This class represents deferred result of asynchronous operation. It is designed specially for returning from function that is like to be used in Chain context.
Typical usage assumes that you create Deferred object, keep it somewhere, start asynchronous operation and return this deferred from function. When asynchronous operation is done, just invoke ready and pass the result (including Exceptions) into it.
Here the example of asynchronous function that starts timer and signals the deferred after 1.0 sec.:
from tornado.ioloop import IOLoop
def timer_function():
deferred = Deferred()
timeout = 1.0
IOLoop.current().add_timer(time.time() + timeout, lambda: deferred.ready('Done')
return deferred
Now you can use timer_function in Chain context:
result = yield timer_function()
Represents future result and provides methods to obtain this result, manipulate or reset.
The result itself can be any object or exception. If some exception is stored, then it will be thrown after user invokes get method.
Note
All methods in this class are thread safe.
Extracts future result from object.
If an exception is stored in this object, than it will be raised, so surround dangerous code with try/except blocks.
>>> FutureResult(1).get()
1
>>> FutureResult(ValueError('ErrorMessage')).get()
Traceback (most recent call last):
...
ValueError: ErrorMessage
Represents prepared future object with in advance known result.
It is useful when you need to return already defined result from function and to use that function in some future context (like chain).
Specified callback or errorback will be triggered on the next event loop turn after bind method is invoked.
Note
While in chain context, you don’t need to use it directly - if you return something from function that meant to be used as chain item, the result will be automatically wrapped with PreparedFuture.
Note
All methods in this class are thread safe.
Wraps function or method, so it can be invoked concurrently by yielding in chain context.
Program control will be returned to the yield statement once processing is done. Current implementation invokes function in separate thread.
Marks function or method as source of chain context.
It means, that the decorated function becomes the first function in the chain pipeline. As a result, there shouldn’t be any parameter passed to that function (except self or cls for class methods).
Let’s get all application names from node service and print them:
from cocaine.services import Service
node = Service('node')
apps = node.list().get()
print(apps)
This method blocks execution of the current client code until at least one chunk will be received or an exception will be thrown. If succeed there is chunk returned from method, otherwise an exception will be reraised.
Timeout can be specified by passing keyword argument:
>>> apps = node.list().get(timeout=1.0)
Note
This method starts event loop, and stops it after chunk receiving. If the current event loop is running when get method invoked, then it won’t be stopped. Anyway, it is not recommended to mix asynchronous and synchronous usage of services, because there are other mechanism to deal with it while event loop is running.
Warning
Use get method only in scripts!
Let’s get all application names from node service and print them while event loop is running (maybe in Worker context or in some other asynchronous event):
from tornado.ioloop import IOLoop
from cocaine.futures import chain
from cocaine.services import Service
node = Service('node')
@chain.source
def magic():
apps = yield node.list()
print(apps)
magic()
IOLoop.current().start()
You can also use tornado and python 3.3 futures in Chain context. Let’s download list of pages simultaneously and print response time of each:
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient
from cocaine.futures import chain
client = AsyncHTTPClient()
@chain.source
def downloadInternet(url):
response = yield client.fetch(url)
print(response.request.url, response.request_time)
urls = [
'http://yandex.ru',
'http://www.google.ru',
'http://www.google.com',
'https://cocaine.readthedocs.org/en/latest/',
'https://github.com/cocaine/cocaine-core',
'https://github.com/cocaine/cocaine-framework-native',
'https://github.com/cocaine/cocaine-framework-python',
'https://github.com/cocaine/cocaine-plugins',
'http://www.tornadoweb.org/en/stable/httpclient.html',
]
for url in urls:
downloadInternet(url)
IOLoop.current().start()
When it is done, there will be sorted list of urls with its response time printed. Note, that the order of urls in the result list is not equal with urls list.
This is typical usage of cocaine python framework.
To simplify code, there is @chain.source decorator which just patch function and creates Chain object from it. Decorated function will be executed automatically when event loop is started.
While in Chain context we can use yield statement on any callable object that returns cocaine.futures.Future objects or its heirs, cocaine.futures.chain objects, python futures (including tornado futures) or any simple objects.
When asynchronous operation completed, program control will be returned to the yield statement, returning actual result. If some exception is thrown while processing asynchronous function, it will be rethrown to the client side just after yield statement, so prepare to catch it. If not caught, it will walk down to the event loop and will be lost forever.
Note
If you want to write cocaine applications, it is recommended to use yield way.
Note
If you need more examples check cocaine.tools package - you can find there a lot of real usage of asynchronous chain API.