The documentation you are viewing is for Dapr v1.6 which is an older version of Dapr. For up-to-date documentation, see the latest version.

使用输出绑定调用不同的资源

使用 Dapr 输出绑定调用外部系统

使用绑定,可以调用外部资源,而无需绑定到特定的 SDK 或库。 有关显示输出绑定的完整示例,请访问此 链接

示例:

以下的示例简述了一个订单处理程序。 在示例中,有一个订单处理服务,它具有 Dapr sidecar。 订单处理服务使用 Dapr 通过输出绑定调用外部资源,在本例中为 Kafka。

显示示例服务绑定的图示

1. 创建绑定

输出绑定表示 Dapr 将使用调用和向其发送消息的资源。

就本指南的目的,您将使用 Kafka 绑定。 您可以在 此处 找到不同绑定规范的列表。

创建一个名称为 checkout 的新绑定组件。

metadata 部分中,配置 Kafka 相关属性,如要将消息发布到其的 topic 和代理。


创建以下 YAML 文件,名为 binding.yaml,并将其保存到应用程序的 components 子文件夹中。 (使用具有 --components-path 标记 的 dapr run 命令来指向自定义组件目录)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka broker connection setting
  - name: brokers
    value: localhost:9092
  # consumer configuration: topic and consumer group
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # publisher configuration: topic
  - name: publishTopic
    value: sample
  - name: authRequired
    value: "false"

要将其部署到 Kubernetes 群集中,请为你想要的 绑定 组件 在下面的 yaml metadata 中填写链接详情,保存为 binding.yaml(在这里为kafka),然后运行 kubectl apply -f binding.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka broker connection setting
  - name: brokers
    value: localhost:9092
  # consumer configuration: topic and consumer group
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # publisher configuration: topic
  - name: publishTopic
    value: sample
  - name: authRequired
    value: "false"

2. 发送事件(输出绑定)

下面是利用 Dapr SDK 与输出绑定交互的代码示例。


//dependencies
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;

//code
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
           string BINDING_NAME = "checkout";
           string BINDING_OPERATION = "create";
           while(true) {
                System.Threading.Thread.Sleep(5000);
                Random random = new Random();
                int orderId = random.Next(1,1000);
                using var client = new DaprClientBuilder().Build();
                //Using Dapr SDK to invoke output binding
                await client.InvokeBindingAsync(BINDING_NAME, BINDING_OPERATION, orderId);
                Console.WriteLine("Sending message: " + orderId);
            }
        }
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动 Dapr sidecar 并运行该应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-ssl dotnet run

//dependencies
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;

//code
@SpringBootApplication
public class OrderProcessingServiceApplication {

    private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

    public static void main(String[] args) throws InterruptedException{
        String BINDING_NAME = "checkout";
        String BINDING_OPERATION = "create";
        while(true) {
            TimeUnit.MILLISECONDS.sleep(5000);
            Random random = new Random();
            int orderId = random.nextInt(1000-1) + 1;
            DaprClient client = new DaprClientBuilder().build();
          //Using Dapr SDK to invoke output binding
            client.invokeBinding(BINDING_NAME, BINDING_OPERATION, orderId).block();
            log.info("Sending message: " + orderId);
        }
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动 Dapr sidecar 并运行该应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run

#dependencies
import random
from time import sleep    
import requests
import logging
import json
from dapr.clients import DaprClient

#code
logging.basicConfig(level = logging.INFO)
BINDING_NAME = 'checkout'
BINDING_OPERATION = 'create' 
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    with DaprClient() as client:
        #Using Dapr SDK to invoke output binding
        resp = client.invoke_binding(BINDING_NAME, BINDING_OPERATION, json.dumps(orderId))
    logging.basicConfig(level = logging.INFO)
    logging.info('Sending message: ' + str(orderId))

导航到包含上述代码的目录,然后运行以下命令以启动 Dapr sidecar 并运行该应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py

//dependencies
import (
    "context"
    "log"
    "math/rand"
    "time"
    "strconv"
    dapr "github.com/dapr/go-sdk/client"

)

//code
func main() {
    BINDING_NAME := "checkout";
    BINDING_OPERATION := "create";
    for i := 0; i < 10; i++ {
        time.Sleep(5000)
        orderId := rand.Intn(1000-1) + 1
        client, err := dapr.NewClient()
        if err != nil {
            panic(err)
        }
        defer client.Close()
        ctx := context.Background()
        //Using Dapr SDK to invoke output binding
        in := &dapr.InvokeBindingRequest{ Name: BINDING_NAME, Operation: BINDING_OPERATION , Data: []byte(strconv.Itoa(orderId))}
        err = client.InvokeOutputBinding(ctx, in)
        log.Println("Sending message: " + strconv.Itoa(orderId))
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动 Dapr sidecar 并运行该应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go

//dependencies

import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client'; 

//code
const daprHost = "127.0.0.1"; 

var main = function() {
    for(var i=0;i<10;i++) {
        sleep(5000);
        var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        start(orderId).catch((e) => {
            console.error(e);
            process.exit(1);
        });
    }
}

async function start(orderId) {
    const BINDING_NAME = "checkout";
    const BINDING_OPERATION = "create";
    const client = new DaprClient(daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.HTTP);
    //Using Dapr SDK to invoke output binding
    const result = await client.binding.send(BINDING_NAME, BINDING_OPERATION, { orderId: orderId });
    console.log("Sending message: " + orderId);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

main();

导航到包含上述代码的目录,然后运行以下命令以启动 Dapr sidecar 并运行该应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

注: 在 Kubernetes 中运行时,使用 kubectl apply -f binding.yaml 将此文件应用于您的集群

还可以使用 HTTP 调用输出绑定端点:

curl -X POST -H 'Content-Type: application/json' http://localhost:3601/v1.0/bindings/checkout -d '{ "data": { "orderId": "100" }, "operation": "create" }'

如上所示,您调用了 /binding 端点,其中包含要调用的绑定的名称,在我们的例子中,它是 checkout。 有效载荷位于必需的 data 字段中,并且可以是任何 JSON 可序列化的值。

您还会注意到,有一个 operation 字段告诉绑定您需要它执行的操作。 您可以查看 这里 查看每个输出绑定都支持的操作。

观看如何使用双向输出绑定的 视频

参考资料