# 流

**URL:** https://heroiclabs.com/docs/zh/nakama/server-framework/streams/
**Summary:** Nakama 的实时消息路由和传递子系统采取流形式。流将对某些消息类型感兴趣的客户端联系在一起，并可让 Nakama 的内部组件将消息传递给相关用户。

---


# 流

流是一个强大的功能，可以让您完全控制 Nakama 的内部实时路由和传递。

Nakama 的实时消息路由和传递子系统采取流形式。流将对某些消息类型感兴趣的客户联系在一起，并让 Nakama 的内部组件可将消息传递给相关用户。

客户端可以接收流的消息和数据，但不能自己直接加入、退出或发送数据。只有服务器[代码运行库](../)中才有这些功能。

服务器中的所有高级实时功能（[聊天频道](../../concepts/chat/)、[多人游戏](../../concepts/multiplayer/relayed/)、[通知](../../concepts/notifications/)等）都是流上的功能。使用这些功能并非必须懂得和使用流。

## 流的结构

流由两个组件定义：**流标识符**和**状态列表**。

### 流标识符

所有流都有其唯一 ID。此 ID 用于将用户放到流上，并定位流，进行消息传递。流 ID 有 4 个字段：

- **模式**表示流的类型。例如，聊天频道的名称各不相同，但却处于同一模式。
- **主题**包含主要流主题，通常为用户 ID。“模式”是唯一的必要字段。
- **描述符**为次级 ID。当流的作用域为一对用户或群组时使用，如两个用户之间的[直接聊天](../../concepts/chat/)。
- **标签**存储字符串，可以是元信息。按名称创建的聊天室使用标签字段。

### 状态列表

流以一组在线用户作为目标并向他们传递消息。每个流维护一个状态列表，该列表使用用户连接所用的套接字对用户进行唯一标识。当服务器向流发送消息时，消息将被传递给由状态标识的所有客户端。

### 持久性和消息历史

可用可选的持久性标志标记状态。服务器在处理传递到流的消息时可以观察此标志，以决定是否应将消息数据存储在数据库中。[实时聊天](../../concepts/chat/)功能使用此标志来决定是否应当存储消息，以便客户端可以请求消息历史。

### 隐藏流成员

当新用户加入或现有用户离开流时，流生成状态事件，通知流中当前的所有用户。可用可选的隐藏标记来对状态做标记。将此设置为真时，当此用户加入或离开时，服务器将不会生成状态事件。

隐藏的状态仍然完全是流成员，因此他们会正常接收数据和状态事件。

## 接收流数据

客户端可以注册一个事件处理程序，以便在通过套接字接收到时使用流数据对象。对于每条流数据消息都将调用一次处理程序函数。

{{< code type="client" >}}
```javascript
socket.onstreamdata = (streamdata) => {
	console.log("Received data from stream: %o", streamdata.stream.subject);
	console.log("Data content: %@.", streamdata.data);
};
```
{{< / code >}}

{{< code type="client" >}}
```csharp
socket.ReceivedStreamState += stream =>
{
	Console.WriteLine("Received data from stream: '{0}'", stream.Stream.Subject);
	Console.WriteLine("Data content: {0}", stream.State);
};
```
{{< / code >}}

{{< code type="client" framework="unity" >}}
```csharp
socket.ReceivedStreamState += stream =>
{
		Debug.LogFormat("Received data from stream: '{0}'", stream.Stream.Subject);
		Debug.LogFormat("Data content: {0}", stream.State);
};
```
{{< / code >}}

{{< code type="client" framework="cocos2d-x" >}}
```cpp
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamData([](const NStreamData& data)
{
		CCLOG("Received data from stream: %s", data.stream.subject.c_str());
		CCLOG("Data content: %s", data.data.c_str());
});
```
{{< / code >}}

{{< code type="client" framework="cocos2d-js" >}}
```javascript
socket.onstreamdata = function (streamdata) {
	cc.log("Received data from stream:", streamdata.stream.subject);
	cc.log("Data content:", streamdata.data);
};
```
{{< / code >}}

{{< code type="client" >}}
```cpp
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamData([](const NStreamData& data)
{
		cout << "Received data from stream: " << data.stream.subject << endl;
		cout << "Data content: " << data.data << endl;
});
```
{{< / code >}}

{{< code type="client" >}}
```java
SocketListener listener = new AbstractSocketListener() {
	@Override
	public void onStreamData(final StreamData data) {
		System.out.println("Received data from stream: " + data.getStream().getSubject());
		System.out.println("Data content: " + data.getData());
	}
};
```
{{< / code >}}

{{< code type="client" framework="godot3" >}}
```gdscript
func _ready():
	# First, setup the socket as explained in the authentication section.
	socket.connect("received_stream_state", self, "_on_stream_state")

func _on_stream_state(p_state : NakamaRTAPI.StreamData):
	print("Received data from stream: %s" % [p_state.stream])
	print("Data: %s" % [parse_json(p_state.state)])
```
{{< / code >}}

{{< code type="client" framework="defold" >}}
```lua
local result = socket.on_streamdata(function(message)
		pprint(message)
end)
```
{{< / code >}}

## 接收流状态事件

当新状态加入流或现有状态离开时，服务器将向流中当前的所有用户广播状态事件。

{{< code type="client" >}}
```javascript
socket.onstreampresence = (streampresence) => {
	console.log("Received presence event for stream: %o", streampresence.id);
	streampresence.joins.forEach((join) => {
		console.log("New user joined: %o", join.user_id);
	});
	streampresence.leaves.forEach((leave) => {
		console.log("User left: %o", leave.user_id);
	});
};
```
{{< / code >}}

{{< code type="client" >}}
```csharp
socket.ReceivedStreamPresence += presenceEvent =>
{
		Console.WriteLine("Received data from stream: '{0}'", presenceEvent.Stream.Subject);
		foreach (var joined in presenceEvent.Joins)
		{
				Console.WriteLine("Joined: {0}", joined);
		}
		foreach (var left in presenceEvent.Leaves)
		{
				Console.WriteLine("Left: {0}", left);
		}
};
```
{{< / code >}}

{{< code type="client" framework="unity" >}}
```csharp
socket.ReceivedStreamPresence += presenceEvent =>
{
		Debug.LogFormat("Received data from stream: '{0}'", presenceEvent.Stream.Subject);
		foreach (var joined in presenceEvent.Joins)
		{
				Debug.LogFormat("Joined: {0}", joined);
		}
		foreach (var left in presenceEvent.Leaves)
		{
				Debug.LogFormat("Left: {0}", left);
		}
};
```
{{< / code >}}

{{< code type="client" framework="cocos2d-x" >}}
```cpp
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamPresence([](const NStreamPresenceEvent& presence)
{
		CCLOG("Received presence event for stream: %s", presence.stream.subject.c_str());
		for (const NUserPresence& userPresence : presence.joins)
		{
			CCLOG("New user joined: %s", userPresence.user_id.c_str());
		}
		for (const NUserPresence& userPresence : presence.leaves)
		{
			CCLOG("User left: %s", userPresence.user_id.c_str());
		}
});
```
{{< / code >}}

{{< code type="client" framework="cocos2d-js" >}}
```javascript
socket.onstreampresence = function(streampresence) {
	cc.log("Received presence event for stream:", streampresence.id);
	streampresence.joins.forEach(function(join) {
		cc.log("New user joined:", join.user_id);
	});
	streampresence.leaves.forEach(function(leave) {
		cc.log("User left:", leave.user_id);
	});
};
```
{{< / code >}}

{{< code type="client" >}}
```cpp
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamPresence([](const NStreamPresenceEvent& presence)
{
		cout << "Received presence event for stream: " << presence.stream.subject << endl;
		for (const NUserPresence& userPresence : presence.joins)
		{
			cout << "New user joined: " << userPresence.user_id << endl;
		}
		for (const NUserPresence& userPresence : presence.leaves)
		{
			cout << "User left: " << userPresence.user_id << endl;
		}
});
```
{{< / code >}}

{{< code type="client" >}}
```java
SocketListener listener = new AbstractSocketListener() {
	@Override
	public void onStreamPresence(final StreamPresenceEvent presence) {
		System.out.println("Received presence event for stream: " + presence.getStream().getSubject());

		for (UserPresence userPresence : presence.getJoins()) {
			System.out.println("User ID: " + userPresence.getUserId() + " Username: " + userPresence.getUsername() + " Status: " + userPresence.getStatus());
		}

		for (UserPresence userPresence : presence.getLeaves()) {
			System.out.println("User ID: " + userPresence.getUserId() + " Username: " + userPresence.getUsername() + " Status: " + userPresence.getStatus());
		}
	}
};
```
{{< / code >}}

{{< code type="client" framework="godot3" >}}
```gdscript
func _ready():
	# First, setup the socket as explained in the authentication section.
	socket.connect("received_stream_presence", self, "_on_stream_presence")

func _on_stream_presence(p_presence : NakamaRTAPI.StreamPresenceEvent):
	print("Received presences on stream: %s" % [p_presence.stream])
	for p in p_presence.joins:
		print("User ID: %s, Username: %s, Status: %s" % [p.user_id, p.username, p.status])
	for p in p_presence.leaves:
		print("User ID: %s, Username: %s, Status: %s" % [p.user_id, p.username, p.status])
```
{{< / code >}}

{{< code type="client" framework="defold" >}}
```lua
local result = socket.on_streampresence(function(message)
		pprint(message.joins)
		pprint(message.leaves)
end)
```
{{< / code >}}

隐藏的状态不会生成状态事件，也不会出现在此事件处理程序接收到的结果中。

## 加入流

服务器可以将用户放在任意数量的流上。要将用户添加到流中，服务器需要用户的 ID、用户当前会话的唯一会话 ID 以及应将用户放置到的流的有关信息。

例如，我们可以注册一个 RPC 函数，该函数将把调用它的用户放置在自定义流中。

{{< code type="server" >}}
```lua
local function join(context, _)
	local stream_id = { mode = 123, label = "my custom stream" }
	local hidden = false
	local persistence = false
	nk.stream_user_join(context.user_id, context.session_id, stream_id, hidden, persistence)
end
nk.register_rpc(join, "join")
```
{{< / code >}}

{{< code type="server" >}}
```go
func JoinStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	mode := 123
	hidden := false
	persistence := false
	if _, err := nk.StreamUserJoin(mode, "", "", "label", userID, sessionID, hidden, persistence, ""); err != nil {
		return "", err
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("join", JoinStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let joinFunction: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 123,
				label: 'my custom stream',
		};
		let hidden = false;
		let persistence = false;
		nk.streamUserJoin(ctx.userId, ctx.sessionId, streamId, hidden, persistence);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('join', joinFunction);
```
{{< / code >}}

如果此用户+会话已经是流的成员，则操作将为 no-op。

## 退出流

退出流也由服务器控制。要从流中删除用户，服务器需要用户 ID、用户当前会话的唯一会话 ID 以及应从中删除用户的流的有关信息。

例如，我们可以注册一个 RPC 函数，该函数将从自定义流中删除调用它的用户。

{{< code type="server" >}}
```lua
local function leave(context, _)
	local stream_id = { mode = 123, label = "my custom stream" }
	nk.stream_user_leave(context.user_id, context.session_id, stream_id)
end
nk.register_rpc(leave, "leave")
```
{{< / code >}}

{{< code type="server" >}}
```go
// RPC Code
func LeaveStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	if err := nk.StreamUserLeave(123, "", "", "label", userID, sessionID); err != nil {
		return "", err
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("leave", LeaveStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let leaveFunction: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 123,
				label: 'my custom stream',
		};
		nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('leave', leaveFunction);
```
{{< / code >}}

如果此用户+会话不是流的成员，则操作将为 no-op。

就像[聊天频道](../../concepts/chat/)和[实时多人游戏比赛](../../concepts/multiplayer/relayed/)一样，在客户端断开连接时，会自动从其所在的流中将其删除。

## 向流发送数据

服务器可通过函数调用将数据发送到流。消息会传递到流中存在的所有用户。发送到流的消息有效载荷可以是任何字符串，但建议使用 JSON 等结构化格式。

{{< code type="server" >}}
```lua
local stream_id = { mode = 123, label = "my custom stream" }
local payload = nk.json_encode({ some = "data" })
nk.stream_send(stream_id, payload)
```
{{< / code >}}

{{< code type="server" >}}
```go
mode := uint8(123)
label := "label"
// Data does not have to be JSON, but it's a convenient format.
data := "{"some":"data"}"
nk.StreamSend(mode, "", "", label, data, nil)
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
	mode: 123,
	label: 'my custom stream',
};
const payload = {"some": "data"};
nk.streamSend(streamId, payload);
```
{{< / code >}}

如流为空，则操作将是 no-op。


## 关闭流

关闭流会将当前其上的所有状态删除。这适用于显式关闭流，让服务器更快地收回资源。

{{< code type="server" >}}
```lua
local stream_id = { mode = 123, label = "my custom stream" }
nk.stream_close(stream_id)
```
{{< / code >}}

{{< code type="server" >}}
```go
mode := uint8(123)
label := "label"
nk.StreamClose(mode, "", "", label)
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
		mode: 123,
		label: 'my custom stream',
};
nk.streamClose(streamId);
```
{{< / code >}}

## 流状态计数

服务器可以查看流上的状态以获得快速计数，而无需处理流的所有状态。

{{< code type="server" >}}
```lua
local stream_id = { mode = 123, label = "my custom stream" }
local count = nk.stream_count(stream_id)
```
{{< / code >}}

{{< code type="server" >}}
```go
mode := uint8(123)
label := "label"
count, err := nk.StreamCount(mode, "", "", label)
if err != nil {
	// Handle error here.
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
		mode: 123,
		label: 'my custom stream',
};
let count = nk.streamCount(streamId);
```
{{< / code >}}

## 列出流状态

流状态列表包含当前在线并连接到该流的每个用户，以及他们所连接的会话 ID 和其他元数据的信息。

{{< code type="server" >}}
```lua
local stream_id = { mode = 123, label = "my custom stream" }
local presences = nk.stream_user_list(stream_id)

for _, presence in ipairs(presences) do
	nk.logger_info("Found user ID " .. presence.user_id)
end
```
{{< / code >}}

{{< code type="server" >}}
```go
mode := uint8(123)
label := "label"
includeHidden := true
includeNotHidden := true

members, err := nk.StreamUserList(mode, "", "", label, includeHidden, includeNotHidden)
if err != nil {
	// Handle error here
}

for _, m := range members {
	logger.Info("Found user: %s\n", m.GetUserId())
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
		mode: 123,
		label: 'my custom stream',
};
let presences = nk.streamUserList(streamId);
presences?.forEach(function (p) {
		logger.info('Found user: %s\n', streamId);
});
```
{{< / code >}}

## 检查流状态

如果只需要一个用户，服务器可以检查流中是否存在该用户，并检索其状态和元数据。

例如，我们可以注册一个 RPC 函数，该函数将检查调用它的用户是否在自定义流中处于活动状态。

{{< code type="server" >}}
```lua
local function check(context, _)
	local stream_id = { mode = 123, label = "my custom stream" }
	local meta = nk.stream_user_get(context.user_id, context.session_id, stream_id)

	-- Meta is nil if the user is not present on the stream.
	if (meta) then
		nk.logger_info("User found on stream!")
	end
end
nk.register_rpc(check, "check")
```
{{< / code >}}

{{< code type="server" >}}
```go
func CheckStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	mode := uint8(123)
	label := "label"

	if metaPresence, err := nk.StreamUserGet(mode, "", "", label, userID, sessionID); err != nil {
		// Handle error.
	} else if metaPresence != nil {
		logger.Info("User found on stream")
	} else {
		logger.Info("User not found on stream")
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("check", CheckStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
		mode: 123,
		label: 'my custom stream',
};
let meta = nk.streamUserGet(ctx.userId, ctx.sessionId, streamId);
if (meta) {
		logger.info('User found on stream');
}
```
{{< / code >}}

## 内置流

[聊天频道](../../concepts/chat/)、[多人游戏](../../concepts/multiplayer/relayed/)和[通知](../../concepts/notifications/)等服务器实时功能在流之上构建。

通过了解这些流的结构，代码运行库可以权威地更改任何这些功能。

| 流              | 模式 | 主题  | 描述符 | 标签              | 信息                                                                                |
|---------------------|------|----------|------------|--------------------|--------------------------------------------------------------------------------------------|
| 通知       | 0    | 用户 ID  | -          | -                  | 控制应用程序内通知向连接的用户的传递。                          |
| 状态              | 1    | 用户 ID  | -          | -                  | 控制[状态](../../concepts/status/)功能并将更新广播到好友。 |
| 聊天频道        | 2    | -        | -          | “频道名称”     | 聊天频道成员资格。                                                              |
| 群组聊天          | 3    | 群组 ID | -          | -                  | 群组的私密聊天频道。                                                            |
| 直接消息      | 4    | 用户 ID  | 用户 ID    | -                  | 两个用户之间的私密直接消息对话。                                   |
| 中继的比赛       | 5    | 比赛 ID | -          | -                  | 中继的实时多人游戏比赛的成员资格和消息路由。                  |
| 权威比赛 | 6    | 比赛 ID | -          | “nakama 节点名称” | 权威实时多人游戏比赛的成员资格和消息路由。           |

将这些流标识符与上述函数一起使用就可以完全控制这些功能的内部行为。

### 示例：将用户踢出聊天频道

此代码将用户从聊天频道中删除。如用户有多个会话连接到频道，则将仅删除指定的一个会话。

{{< code type="server" >}}
```lua
local stream_id = { mode = 2, label = "some chat channel name" }
local user_id = "user ID to kick"
local session_id = "session ID to kick"
nk.stream_user_leave(user_id, session_id, stream_id)
```
{{< / code >}}

{{< code type="server" >}}
```go
mode := uint8(123)
label := "some chat room channel name"
userID := "user ID to kick"
sessionID := "session ID to kick"

if err := nk.StreamUserLeave(mode, "", "", label, userID, sessionID); err != nil {
	// Handle error.
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
let streamId: nkruntime.Stream = {
		mode: 123,
		label: 'my custom stream',
};
nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);
```
{{< / code >}}

### 示例：停止接收通知

用户可调用 RPC 函数，让其通知“静默”。之后即使他们继续在线，他们将不再能够接收到任何应用程序内通知的实时传送。

{{< code type="server" >}}
```lua
local function enable_silent_mode(context, _)
	local stream_id = { mode = 0, subject = context.user_id }
	nk.stream_user_leave(context.user_id, context.session_id, stream_id)
end
nk.register_rpc(enable_silent_mode, "enable_silent_mode")
```
{{< / code >}}

{{< code type="server" >}}
```go
func EnableSilentMode(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	if err := nk.StreamUserLeave(0, userId, "", "", userID, sessionID); err != nil {
		// Handle error.
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("enable_silent_mode", EnableSilentMode); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
```
{{< / code >}}

{{< code type="server" >}}
```typescript
	let enableSilentModeFn: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 123,
				label: 'my custom stream',
		};
		nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('enable_silent_mode', enableSilentModeFn);
```
{{< / code >}}
