资产管理 #

以下示例演示了将Nakama用作资产管理系统,以便为客户端提供对安全云托管(Amazon S3)资产的临时访问权限。 这在许多情况下都非常有益。例如,您可以选择将游戏资产托管到私人AmazonS3存储桶中,然后允许客户端在运行时安全地下载这些资产。

使用资产管理系统有许多优势,例如:

  • 资产可保持私密状态,按需提供,且有时间限制
  • 可为客户端提供不同资产(例如beta访问资产)
  • 减少为进行某些事项(如列出资产)而与您的云存储提供商之间的往返行程
  • 允许用户生成的内容和资产验证

该系统包括3个主要部分:

  • 资产清单,使用Nakama存储引擎进行分类和存储
  • 资产上传RPC,负责获取二进制数据并将其上传到云,以及存储相应的资产清单
  • 资产下载RPC,为请求的资产生成一个预签名的临时下载URL

该系统与客户端无关,但在本指南的最后,您将了解如何使用它来实现安全的资产交付系统,从而直接从私人S3存储桶提供对Unity可寻址资产的运行时访问。

资产清单 #

使用Nakama存储引擎,您可以存储包含已上传特定资产信息的资产清单,包括:

  • 名称
  • 类别
  • 所有者
  • md5 checksum
  • 创建的时间戳

在Nakama中存储资产清单允许客户端检索单个记录或检索记录列表(例如按类别),而无需多次往返云存储提供商,因为这可能会产生成本。

您可以使用标准结构定义资产清单和其他相关消息,也可以使用特定的消息格式,如协议缓冲区,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
syntax = "proto3";

package api;

option go_package = "heroiclabs.com/nakama-amazon-s3/api";

message RpcAssetUploadRequest {
  string category = 1;
  string name = 2;
  bytes data = 3;
}

message RpcAssetUploadResponse {
  AssetManifest manifest = 1;
}

message RpcAssetDownloadRequest {
  string category = 1;
  string name = 2;
}

message RpcAssetDownloadResponse {
  string name = 1;
  string download_uri = 2;
}

message RpcAllAssetsDownloadResponse {
  repeated RpcAssetDownloadResponse assets = 1;
}

message AssetManifest {
  string category = 1;
  string name = 2;
  string owner = 3;
  string md5 = 4;
  int64 created_at = 5;
}
如果您正在使用协议缓冲区,您还需要使用protoc工具从.proto文件中为您的编程语言生成适当的文件。请查阅协议缓冲区基础知识:Go文档了解更多信息。

启动引导服务器运行时 #

在实现资产上传/下载RPC之前,您需要初始化协议缓冲区编排和S3服务对象以及上传工具。

您可以通过local.yml配置文件下的runtime.env章节将环境变量传递给Nakama。查看配置文档了解更多信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func InitModule(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, initializer runtime.Initializer) error {
  initStart := time.Now()
  env := ctx.Value(runtime.RUNTIME_CTX_ENV).(map[string]string)

  // Ensure AWS environment vars have been configured
  awsAccessKeyID, ok := env["AWS_ACCESS_KEY_ID"]
  if !ok {
    return errors.New("missing AWS_ACCESS_KEY_ID in runtime env")
  }
  awsSecretAccessKey, ok := env["AWS_SECRET_ACCESS_KEY"]
  if !ok {
    return errors.New("missing AWS_SECRET_ACCESS_KEY in runtime env")
  }
  awsRegion, ok := env["AWS_REGION"]
  if !ok {
    return errors.New("missing AWS_REGION in runtime env")
  }
  awsBucket, ok := env["AWS_BUCKET"]
  if !ok {
    return errors.New("missing AWS_BUCKET in runtime env")
  }

  // Create protobuf marshalers
  marshaler := &jsonpb.Marshaler{
    EnumsAsInts: true,
  }

  unmarshaler := &jsonpb.Unmarshaler{
    AllowUnknownFields: false,
  }

  // Create an S3 service object and uploader
  sess := session.Must(session.NewSession(&aws.Config{
    Region:      aws.String(awsRegion),
    Credentials: credentials.NewStaticCredentials(awsAccessKeyID, awsSecretAccessKey, ""),
  }))

  svc := s3.New(sess)
  uploader := s3manager.NewUploader(sess)

  // Register RPCs here...

  logger.Info("Plugin loaded in '%d' msec.", time.Since(initStart).Milliseconds())
  return nil
}

资产上传RPC #

定义资产清单后,您可以实现一个RPC函数,该函数将接收编码为base64字符串的二进制数据,创建并存储资产清单,并将二进制文件上传到私人Amazon S3存储桶中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func UploadAssetToS3(marshaler *jsonpb.Marshaler, unmarshaler *jsonpb.Unmarshaler, uploader *s3manager.Uploader, bucketName string) func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
  return func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
    // Check if s3 storage client is configured or return error
    if uploader == nil {
      return "", errors.New("s3 uploader not configured")
    }

    if len(bucketName) == 0 {
      return "", errors.New("no s3 bucket name provided")
    }

    // Check user that's making the request
    userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
    if !ok {
      return "", errors.New("user ID not found")
    }

    // Unmarshal the upload request input
    request := &api.RpcAssetUploadRequest{}

    logger.Info("got payload: %q", payload)

    if err := unmarshaler.Unmarshal(bytes.NewReader([]byte(payload)), request); err != nil {
      logger.WithField("error", err).Error("error unmarshalling RpcAssetUploadRequest")
      return "", errors.New("error unmarshalling RpcAssetUploadRequest")
    }

    // Check that the input fields are valid
    if request.Category == "" {
      return "", errors.New("asset Category required")
    }

    if request.Name == "" {
      return "", errors.New("asset Name required")
    }

    if len(request.Data) == 0 {
      return "", errors.New("asset Data required")
    }

    // Upload the asset to s3 bucket
    result, err := uploader.Upload(&s3manager.UploadInput{
      Bucket: aws.String(bucketName),
      Key:    aws.String(request.Name),
      Body:   bytes.NewReader(request.Data),
    })
    if err != nil {
      logger.WithField("error", err).Error("failed to upload file")
      return "", errors.New("failed to upload file")
    }

    // Prepare the asset manifest
    md5 := strings.ReplaceAll(*result.ETag, """, "")
    manifest := &api.AssetManifest{
      Category:  request.Category,
      Name:      request.Name,
      Owner:     userID,
      Md5:       md5,
      CreatedAt: time.Now().Unix(),
    }

    buf := &bytes.Buffer{}
    if err := marshaler.Marshal(buf, manifest); err != nil {
      logger.WithField("error", err).Error("error encoding asset manifest")
      return "", errors.New("error encoding asset manifest")
    }

    // Write the asset manifest to nakama storage
    if \_, err := nk.StorageWrite(ctx, []*runtime.StorageWrite{{
      Collection:      fmt.Sprintf("asset\_%s", request.Category),
      Key:             request.Name,
      UserID:          "", // Will be owned by the system user
      Value:           buf.String(),
      PermissionRead:  2, // Clients can read directly
      PermissionWrite: 0, // Only server can write
    }}); err != nil {
      logger.WithField("error", err).Error("error writing asset manifest")
      return "", errors.New("error writing asset manifest")
    }

    // Prepare response to the client
    response := &api.RpcAssetUploadResponse{
      Manifest: manifest,
    }

    buf = &bytes.Buffer{}
    if err := marshaler.Marshal(buf, response); err != nil {
      logger.WithField("error", err).Error("error encoding response")
      return "", errors.New("error encoding response")
    }

    return buf.String(), nil
  }
}

然后您必须在InitModule函数中注册此RPC。

1
2
3
if err := initializer.RegisterRpc("upload_asset", UploadAssetToS3(marshaler, unmarshaler, uploader, awsBucket)); err != nil {
  return err
}

资产下载RPC #

资产下载RPC将检查所请求文件的现有资产清单,找到清单后,将与Amazon S3对话并生成资产的预签名临时下载URL,然后将其传递回客户端。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func GetAssetDownloadUri(marshaler *jsonpb.Marshaler, unmarshaler *jsonpb.Unmarshaler, svc *s3.S3, bucketName string) func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
  return func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
    // Check user that's making the request
    _, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
    if !ok {
      // Would happen if calling this RPC server-to-server. See: https://heroiclabs.com/docs/nakama/server-framework/introduction/#server-to-server
      // We're choosing not to allow this here, downloads are expected to be done by a user.
      return "", errors.New("user ID not found")
    }

    // Unmarshal the input
    request := &api.RpcAssetDownloadRequest{}
    if err := unmarshaler.Unmarshal(bytes.NewReader([]byte(payload)), request); err != nil {
      logger.WithField("error", err).Error("error unmarshalling request")
      return "", errors.New("error unmarshalling request")
    }

    // Check input fields are valid
    if request.Category == "" {
      return "", errors.New("asset Category required")
    }

    if request.Name == "" {
      return "", errors.New("asset Name required")
    }

    // Look up the asset in the storage manifest
    objects, err := nk.StorageRead(ctx, []*runtime.StorageRead{{
      Collection: fmt.Sprintf("asset_%s", request.Category),
      Key:        request.Name,
      UserID:     "", // Owned by the system user
    }})

    if err != nil {
      logger.WithField("error", err).Error("error reading asset manifest")
      return "", errors.New("error reading asset manifest")
    }

    // Check if the object exists
    if len(objects) == 0 {
      return "", errors.New("asset not found")
    }

    // Get the asset manifest
    manifest := &api.AssetManifest{}
    if err := unmarshaler.Unmarshal(bytes.NewReader([]byte(objects[0].Value)), manifest); err != nil {
      logger.WithField("error", err).Error("error unmarshaling manifest")
      return "", errors.New("error unmarshaling manifest")
    }

    // Prepare a short-lived asset download link
    assetDownload, err := getAssetDownloadUri(request.Name, svc, bucketName)
    if err != nil {
      logger.WithField("error", err).Error(err.Error())
      return "", err
    }

    // Prepare response to client
    buf := &bytes.Buffer{}
    if err := marshaler.Marshal(buf, assetDownload); err != nil {
      logger.WithField("error", err).Error("error encoding response")
      return "", errors.New("error encoding response")
    }

    logger.Info("Responding with:\n%q", buf.String())

    return buf.String(), nil
  }
}

func getAssetDownloadUri(key string, svc *s3.S3, bucketName string) (*api.RpcAssetDownloadResponse, error) {
  // Prepare a short-lived asset download link
  req, _ := svc.GetObjectRequest(&s3.GetObjectInput{
    Bucket: aws.String(bucketName),
    Key:    aws.String(key),
  })

  uri, err := req.Presign(24 * time.Hour)
  if err != nil {
    return nil, errors.New("error signing request")
  }

  response := &api.RpcAssetDownloadResponse{
    Name:        key,
    DownloadUri: uri,
  }

  return response, nil
}

之后,必须在InitModule函数中注册此RPC。

1
2
3
if err := initializer.RegisterRpc("get_asset_download_uri", GetAssetDownloadUri(marshaler, unmarshaler, svc, awsBucket)); err != nil {
  return err
}

按类别资产下载RPC #

只需提供您希望下载的资产类别,就可以获得资产下载URL列表。下面我们创建一个新的RPC来生成指定类别资产的预签名下载URL列表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func GetAllAssetDownloadUris(marshaler *jsonpb.Marshaler, unmarshaler *jsonpb.Unmarshaler, svc *s3.S3, bucketName string) func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
  return func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
    // Check user that's making the request
    _, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
    if !ok {
      // Would happen if calling this RPC server-to-server. See: https://heroiclabs.com/docs/nakama/server-framework/introduction/#server-to-server
      // We're choosing not to allow this here, downloads are expected to be done by a user.
      return "", errors.New("user ID not found")
    }

    // Unmarshal the input
    request := &api.RpcAssetDownloadRequest{}
    if err := unmarshaler.Unmarshal(bytes.NewReader([]byte(payload)), request); err != nil {
      logger.WithField("error", err).Error("error unmarshalling request")
      return "", errors.New("error unmarshalling request")
    }

    // Check input fields are valid
    if request.Category == "" {
      return "", errors.New("asset Category required")
    }

    // Look up the asset in the storage manifest
    objects, \_, err := nk.StorageList(ctx, "", fmt.Sprintf("asset\_%s", request.Category), 100, "")
    if err != nil {
      logger.WithField("error", err).Error("error reading asset manifest")
      return "", errors.New("error reading asset manifest")
    }

    // Check if any objects exist
    if len(objects) == 0 {
      return "", errors.New("no assets found")
    }

    // Create a response object
    response := &api.RpcAllAssetsDownloadResponse{}

    // Loop through all assets and get a download uri
    for _, object := range objects {
      assetDownload, err := getAssetDownloadUri(object.Key, svc, bucketName)
      if err != nil {
        logger.WithField("error", err).Error(err.Error())
        return "", err
      }

      response.Assets = append(response.Assets, assetDownload)
    }

    // Marshal the response
    buf := &bytes.Buffer{}
    if err := marshaler.Marshal(buf, response); err != nil {
      logger.WithField("error", err).Error("error encoding response")
      return "", errors.New("error encoding response")
    }

    return buf.String(), nil
  }
}

然后您必须在InitModule函数中注册此RPC。

1
2
3
if err := initializer.RegisterRpc("get_all_asset_download_uris_by_category", GetAllAssetDownloadUris(marshaler, unmarshaler, svc, awsBucket)); err != nil {
  return err
}

示例:将资产管理系统与Unity可寻址集成 #

上述资产管理系统可以与远程资产交付管道(如Unity的可寻址功能)集成。 您可以使用Unity可寻址来打包资产,将其远程托管,然后在运行时加载这些资产。

Unity可寻址与Amazon S3、Google文件云存储或Digital Ocean对象存储等公共云存储开箱即用。遗憾的是,它不支持私人云存储开箱即用。但是,只需几个简单的步骤就可以集成我们的Nakama资产管理系统。

首先,需要将您的可寻址资产配置设置为从远程路径构建和加载,如下所示。确保您还勾选了构建远程目录选项。

配置可寻址以远程构建和加载
配置可寻址以远程构建和加载

您可以从可寻址资料窗口为您的可寻址资产配置精确的远程加载路径。但是,您可以在此处输入您喜欢的任何地址,这个地址应该以http(s)开头,因为它将在运行时被您稍后添加的代码替换掉。

配置可寻址远程加载路径
配置可寻址远程加载路径

配置可寻址系统后,通过单击可寻址群组窗口中的创建按钮来构建可寻址资产包。如果您首次构建您的资产,选择新建,否则选择更新先前构建(这可以确保使用相同的目录文件,从而允许客户端下载最新的资产,而无需更新客户端)。然后,您可以使用本指南前面提到的asset_uploadRPC将这些资产上传到您的Nakama服务器。您可以决定如何完成这一操作,建议您编写一个编辑器脚本,可以通过执行该脚本来上传资产。相关示例如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var directoryInfo = new DirectoryInfo("<PathToAssetBundle>/StandaloneWindows64");
foreach (var fileInfo in directoryInfo.GetFiles())
{
  var fileBytes = UnityEngine.Windows.File.ReadAllBytes(fileInfo.FullName);
  var request = new RpcAssetUploadRequest
  {
    category = "StandaloneWindows64",
    name = fileInfo.Name,
    data = Convert.ToBase64String(fileBytes)
  };
  await Client.RpcAsync(Session, "upload_asset", JsonUtility.ToJson(request));
}

要想开始使用Unity实施Nakama资产管理系统,请将以下类别定义添加到您的项目中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[Serializable]
public class RpcAssetUploadRequest
{
  public string category;
  public string name;
  public string data;
}

[Serializable]
public class RpcAssetDownloadRequest
{
  public string category;
  public string name;
}

[Serializable]
public class RpcAssetDownloadResponse
{
  public string name;
  public string downloadUri;
}

[Serializable]
public class RpcGetAllAssetsDownloadResponse
{
  public RpcAssetDownloadResponse[] assets;
}

然后,创建一个名为AddressablesManager的脚本,开始时,检索相关的(当前平台的)Unity资产目录(hash/json)以及来自Nakama的资产包。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private IDictionary<string, string> assetPresignedUris = new Dictionary<string, string>();

private async void Start()
{
  await FetchPresignedAssets("StandaloneWindows64");
}

private async Task FetchPresignedAssets(string buildTarget)
{
  var payload = new RpcAssetDownloadRequest { category = buildTarget };
  var response = await Client.RpcAsync(Session, "get_all_asset_download_uris_by_category", JsonUtility.ToJson(payload));
  var result = JsonUtility.FromJson<RpcGetAllAssetsDownloadResponse>(response.Payload);

  foreach (var asset in result.assets)
  {
    assetPresignedUris.Add(asset.name, asset.downloadUri);
  }
}

接下来,采用Start方法为可寻址系统的ResourceManager.InternalIdTransformFunc分配一个新的变换函数,该方法将获取内部资产ID并将其转换为我们预签名的资产下载URL。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private async void Start()
{
  // ...
  Addressables.ResourceManager.InternalIdTransformFunc += TransformFunc;
}

private string TransformFunc(IResourceLocation location)
{
  // If we're trying to load a remote asset over http(s):
  // - grab the file name from the url path
  // - look up the pre-signed url and return that if available; otherwise just return the same location.InternalId
  var regex = new Regex("https?.+/([^?]+)");
  var match = regex.Match(location.InternalId);
  if (match.Success)
  {
    var key = match.Groups[1].Value;
    if (assetPresignedUris.ContainsKey(key))
    {
      return assetPresignedUris[key];
    }
  }

  return location.InternalId;
}

最后,像往常一样将您的可寻址资产实例化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
private async void Start()
{
  // ...
  Addressables.InitializeAsync().Completed += OnInitializeAsyncCompleted;
}

private void OnInitializeAsyncCompleted(AsyncOperationHandle<IResourceLocator> obj)
{
  // e.g.
  // addressablePrefabReference.InstantiateAsync().Completed += go =>
  // {
  //  Instantiate(go.Result);
  // };
}

现在,您应该能够构建和运行您的应用程序,并查看加载的私有托管可寻址资产。

运行时私有托管的可寻址资产
运行时私有托管的可寻址资产