Skip to content

A standalone nameko rpc proxy for asyncio and some wrappers for using nameko rpc proxy with asynchronous web frameworks(Sanic, fastapi).

License

Notifications You must be signed in to change notification settings

laiyongtao/aio_nameko_proxy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

32 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

aio-nameko-proxy

A standalone nameko rpc proxy for asyncio and some wrappers for using nameko rpc proxy with asynchronous web frameworks(Sanic, fastapi).

This project is based on aio-pika and reference the source code of official nameko project and aio-pika.

install

pip install aio-nameko-proxy

examples:

standalone AIOClusterRpcProxy

If you want most of your messages to be persistent(default). Set the delivery mode parameter as DeliveryMode.PERSISTENT, Call sw_dlm_call when you need to send a non-persistent message.

import ssl
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode

config = {
    "AMQP_URI": "amqp://guest:guest@127.0.0.1:5672",  # Required, 
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5, 
    "delivery_mode": DeliveryMode.PERSISTENT,
    "serializer": "my_serializer",
    "ACCEPT": ["pickle", "json", "my_serializer"],
    "SERIALIZERS": {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    },
    # If SSL is configured, Remember to change the URI to TLS port. eg: "amqps://guest:guest@127.0.0.1:5671"
    "AMQP_SSL": {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
}

async def run():

    async with AIOClusterRpcProxy(config) as rpc:
            # time_out: the time_out of waitting the remote method result.
            # con_time_out: the time_out of connecting to the rabbitmq server or binding the queue, consume and so on.

            # persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
            result = await reply_obj.result()
    
            # non-persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
            result = await reply_obj.result()


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

If you want most of your messages to be non-persistent(persistent is default). Set the delivery mode parameter as DeliveryMode.NOT_PERSISTENT, Call sw_dlm_call when you need to send a persistent message.

import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
    "AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5, 
    "delivery_mode": DeliveryMode.NOT_PERSISTENT
}

async def run():
    async with AIOClusterRpcProxy(config) as rpc:
            # non-persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
            result = await reply_obj.result()
    
            # persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
            result = await reply_obj.result()


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

AIOPooledClusterRpcProxy

import asyncio
from aio_nameko_proxy import AIOPooledClusterRpcProxy
from aio_pika import DeliveryMode

config = {
    "AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5,
    "pool_size": 10,
    "initial_size": 2,
    "delivery_mode": DeliveryMode.NOT_PERSISTENT
}


async def run():

    async with AIOPooledClusterRpcProxy(config) as proxy_pool:
    
            async with proxy_pool.acquire() as rpc:
                result = await rpc.rpc_demo_service.normal_rpc("demo")


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

Sanic Wrapper

import ssl
from sanic import Sanic
from sanic.response import json
from aio_pika import DeliveryMode
from aio_nameko_proxy.wrappers import SanicNamekoClusterRpcProxy

class Config(object):
    # AMQP_URI: Required
    NAMEKO_AMQP_URI = "pyamqp://guest:guest@127.0.0.1:5672"
    # rpc_exchange
    NAMEKO_RPC_EXCHANGE = "nameko-rpc"
    # pool_size
    NAMEKO_POOL_SIZE = 60
    # initial_size
    NAMEKO_INITIAL_SIZE = 60
    # time_out
    NAMEKO_TIME_OUT = 30
    # con_time_out
    NAMEKO_CON_TIME_OUT = 5
    # serializer
    NAMEKO_SERIALIZER = "json"
    # ACCEPT
    NAMEKO_ACCEPT = ["pickle", "json"]
    # SERIALIZERS: custom serializers
    NAMEKO_SERIALIZERS = {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    }
    # AMQP_SSL: ssl configs
    NAMEKO_AMQP_SSL = {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
    # delivery_mode
    NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
    # other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
    # ...


app = Sanic("App Name")
app.config.from_object(Config)

# rpc_cluster = SanicNamekoClusterRpcProxy(app)

# or

# from aio_nameko_proxy.wrappers import rpc_cluster  # contextvars required in py36
# SanicNamekoClusterRpcProxy(app)

# or
rpc_cluster = SanicNamekoClusterRpcProxy()
rpc_cluster.init_app(app)


@app.route("/")
async def test(request):
    
    rpc = await rpc_cluster.get_proxy()

    result = await rpc.rpc_demo_service.normal_rpc("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
    result = await reply_obj.result()

    result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
    result = await reply_obj.result()

    return json({"hello": "world"})


@app.websocket('/ws')
async def ws(request, ws):
    
    rpc = await rpc_cluster.get_proxy()
    
    for i in range(3):
        _ = await ws.recv()
        result = await rpc.rpc_demo_service.normal_rpc("demo")
        await ws.send(result)
    ws.close()
    
    # in websocket handlers, you should call the remove actively in the end
    rpc_cluster.remove()



if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

FastAPI Wrapper

import ssl
from fastapi import FastAPI, WebSocket
from aio_pika import DeliveryMode
from pydantic import BaseSettings

from aio_nameko_proxy.wrappers import FastApiNamekoProxyMiddleware, rpc_cluster  # contextvars required in py36



class Settings(BaseSettings):

    # AMQP_URI: Required
    NAMEKO_AMQP_URI = "pyamqp://guest:guest@127.0.0.1:5672"
    # rpc_exchange
    NAMEKO_RPC_EXCHANGE = "nameko-rpc"
    # pool_size
    NAMEKO_POOL_SIZE = 60
    # initial_size
    NAMEKO_INITIAL_SIZE = 60
    # time_out
    NAMEKO_TIME_OUT = 30
    # con_time_out
    NAMEKO_CON_TIME_OUT = 5
    # serializer
    NAMEKO_SERIALIZER = "json"
    # ACCEPT
    NAMEKO_ACCEPT = ["pickle", "json"]
    # SERIALIZERS: custom serializers
    NAMEKO_SERIALIZERS = {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    }
    # AMQP_SSL: ssl configs
    NAMEKO_AMQP_SSL = {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
    # delivery_mode
    NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
    # other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
    # ...

settings = Settings()

app = FastAPI()

app.add_middleware(FastApiNamekoProxyMiddleware, config=settings)

@app.get("/")
async def test():
    
    rpc = await rpc_cluster.get_proxy()

    result = await rpc.rpc_demo_service.normal_rpc("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
    result = await reply_obj.result()

    result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
    result = await reply_obj.result()

    return {"hello": "world"}


@app.websocket("/ws")
async def ws(ws: WebSocket):
    await ws.accept()
    rpc = await rpc_cluster.get_proxy()
        
    for i in range(3):
        _ = await ws.receive()
        result = await rpc.rpc_demo_service.normal_rpc("demo")
        await ws.send(result)
    ws.close()
    
    # in websocket handlers, you should call the remove() actively in the end
    rpc_cluster.remove()

About

A standalone nameko rpc proxy for asyncio and some wrappers for using nameko rpc proxy with asynchronous web frameworks(Sanic, fastapi).

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages