The documentation you are viewing is for Dapr v1.6 which is an older version of Dapr. For up-to-date documentation, see the latest version.

开始使用 Dapr Python gRPC 服务扩展

如何使用 Dapr Python gRPC 扩展包启动和运行

Dapr Python SDK 提供了一个内置的 gRPC 服务器扩展模块 dapr.ext.grpc,用于创建 Dapr 服务。

安装

您可以使用以下命令下载并安装 Dapr gRPC 服务器扩展模块:


pip install dapr-ext-grpc

pip3 install dapr-ext-grpc-dev

示例

App 对象可以用来创建服务器。

监听服务调用请求

InvokeMethodReqestInvokeMethodResponse 对象可用于处理传入请求。

监听并响应请求的简单服务将如下所示:

from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse

app = App()

@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
    print(request.metadata, flush=True)
    print(request.text(), flush=True)

    return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")

app.run(50051)

完整的示例可以在 这里 找到。

订阅主题

from cloudevents.sdk.event import v1
from dapr.ext.grpc import App

app = App()

# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
    print(event.Data(),flush=True)

# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
               rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
    print(event.Data(),flush=True)

app.run(50051)

完整的示例可以在 这里 找到。

设置输入绑定触发器

from dapr.ext.grpc import App, BindingRequest

app = App()

@app.binding('kafkaBinding')
def binding(request: BindingRequest):
    print(request.text(), flush=True)

app.run(50051)

完整的示例可以在 这里 找到。

相关链接