Streams #

Streams are a powerful feature that gives you full control over Nakama’s internal real-time routing and delivery.

Nakama’s real-time message routing and delivery subsystem is organized into streams. Streams tie together clients interested in certain message types and allow Nakama’s internal components to deliver messages to relevant users.

Clients may receive messages and data from streams, but are not allowed to directly join, leave, or send data themselves. These functions are only available in the server code runtime.

All of the higher-level real-time features in the server (chat channels, multiplayer, notifications, etc.) are built as features on top of streams. Understanding and using streams are not necessary to use these features.

Structure of a stream #

Streams are defined by two components: a stream identifier and presence list.

Stream identifier #

All streams have their own unique id. This is used to place users onto streams and locate streams for message delivery. A stream id has 4 fields:

  • Mode marks the type of stream. For example chat channels have different names but the same mode. Stream modes must be in the range 0-255.
  • Subject contains a primary stream subject usually a user id. Mode is the only required field.
  • Subcontext is a secondary id. Used when a stream is scoped to a pair of users or groups like with direct chat between two users.
  • Label stores a string which could be meta-information. A chat room created by name uses the label field.

There are several built-in streams available. Note that the mode values used by these streams are reserved cannot be used for custom streams.

Presence list #

Streams are a way to address a set of online users and deliver messages to them. Each stream maintains a list of presences that uniquely identify a user with the socket they’re connected on. When the server sends a message to a stream it will be delivered to all clients identified by the presences.

Persistence and message history #

Presences may be marked with an optional persistence flag. The server can observe this flag when handling messages delivered to a stream to decide whether the message data should be stored in the database. The real-time chat feature uses this flag to decide if messages should be stored so clients can request message history.

Hidden stream members #

Streams generate presence events that notify all users currently in the stream when a new user joins or an existing user leaves the stream. Presences may be marked with an optional hidden flag. When this is set to true the server will not generate presence events when this user joins or leaves.

Hidden presences are still full stream members so they do receive data and presence events as normal.

Receiving stream data #

Clients can register an event handler to consume stream data objects when received over the socket. The handler function will be called once for each stream data message.

Client
1
2
3
4
socket.onstreamdata = (streamdata) => {
	console.log("Received data from stream: %o", streamdata.stream.subject);
	console.log("Data content: %@.", streamdata.data);
};
Client
1
2
3
4
5
socket.ReceivedStreamState += stream =>
{
	Console.WriteLine("Received data from stream: '{0}'", stream.Stream.Subject);
	Console.WriteLine("Data content: {0}", stream.State);
};
Client
1
2
3
4
5
socket.ReceivedStreamState += stream =>
{
		Debug.LogFormat("Received data from stream: '{0}'", stream.Stream.Subject);
		Debug.LogFormat("Data content: {0}", stream.State);
};
Client
1
2
3
4
5
6
7
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamData([](const NStreamData& data)
{
		CCLOG("Received data from stream: %s", data.stream.subject.c_str());
		CCLOG("Data content: %s", data.data.c_str());
});
Client
1
2
3
4
socket.onstreamdata = function (streamdata) {
	cc.log("Received data from stream:", streamdata.stream.subject);
	cc.log("Data content:", streamdata.data);
};
Client
1
2
3
4
5
6
7
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamData([](const NStreamData& data)
{
		cout << "Received data from stream: " << data.stream.subject << endl;
		cout << "Data content: " << data.data << endl;
});
Client
1
2
3
4
5
6
7
SocketListener listener = new AbstractSocketListener() {
	@Override
	public void onStreamData(final StreamData data) {
		System.out.println("Received data from stream: " + data.getStream().getSubject());
		System.out.println("Data content: " + data.getData());
	}
};
Client
1
2
3
4
5
6
7
func _ready():
	# First, setup the socket as explained in the authentication section.
	socket.connect("received_stream_state", self, "_on_stream_state")

func _on_stream_state(p_state : NakamaRTAPI.StreamData):
	print("Received data from stream: %s" % [p_state.stream])
	print("Data: %s" % [parse_json(p_state.state)])
Client
1
2
3
4
5
6
7
func _ready():
	# First, setup the socket as explained in the authentication section.
	socket.received_stream_state.connect(self._on_stream_state)

func _on_stream_state(p_state : NakamaRTAPI.StreamData):
	print("Received data from stream: %s" % [p_state.stream])
	print("Data: %s" % [JSON.parse_string(p_state.state)])
Client
1
2
3
local result = socket.on_streamdata(function(message)
		pprint(message)
end)

Receiving stream presence events #

When a new presence joins a stream or an existing presence leaves the server will broadcast presence events to all users currently on the stream.

Client
1
2
3
4
5
6
7
8
9
socket.onstreampresence = (streampresence) => {
	console.log("Received presence event for stream: %o", streampresence.id);
	streampresence.joins.forEach((join) => {
		console.log("New user joined: %o", join.user_id);
	});
	streampresence.leaves.forEach((leave) => {
		console.log("User left: %o", leave.user_id);
	});
};
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
socket.ReceivedStreamPresence += presenceEvent =>
{
		Console.WriteLine("Received data from stream: '{0}'", presenceEvent.Stream.Subject);
		foreach (var joined in presenceEvent.Joins)
		{
				Console.WriteLine("Joined: {0}", joined);
		}
		foreach (var left in presenceEvent.Leaves)
		{
				Console.WriteLine("Left: {0}", left);
		}
};
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
socket.ReceivedStreamPresence += presenceEvent =>
{
		Debug.LogFormat("Received data from stream: '{0}'", presenceEvent.Stream.Subject);
		foreach (var joined in presenceEvent.Joins)
		{
				Debug.LogFormat("Joined: {0}", joined);
		}
		foreach (var left in presenceEvent.Leaves)
		{
				Debug.LogFormat("Left: {0}", left);
		}
};
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamPresence([](const NStreamPresenceEvent& presence)
{
		CCLOG("Received presence event for stream: %s", presence.stream.subject.c_str());
		for (const NUserPresence& userPresence : presence.joins)
		{
			CCLOG("New user joined: %s", userPresence.user_id.c_str());
		}
		for (const NUserPresence& userPresence : presence.leaves)
		{
			CCLOG("User left: %s", userPresence.user_id.c_str());
		}
});
Client
1
2
3
4
5
6
7
8
9
socket.onstreampresence = function(streampresence) {
	cc.log("Received presence event for stream:", streampresence.id);
	streampresence.joins.forEach(function(join) {
		cc.log("New user joined:", join.user_id);
	});
	streampresence.leaves.forEach(function(leave) {
		cc.log("User left:", leave.user_id);
	});
};
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);
listener.onStreamPresence([](const NStreamPresenceEvent& presence)
{
		cout << "Received presence event for stream: " << presence.stream.subject << endl;
		for (const NUserPresence& userPresence : presence.joins)
		{
			cout << "New user joined: " << userPresence.user_id << endl;
		}
		for (const NUserPresence& userPresence : presence.leaves)
		{
			cout << "User left: " << userPresence.user_id << endl;
		}
});
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SocketListener listener = new AbstractSocketListener() {
	@Override
	public void onStreamPresence(final StreamPresenceEvent presence) {
		System.out.println("Received presence event for stream: " + presence.getStream().getSubject());

		for (UserPresence userPresence : presence.getJoins()) {
			System.out.println("User ID: " + userPresence.getUserId() + " Username: " + userPresence.getUsername() + " Status: " + userPresence.getStatus());
		}

		for (UserPresence userPresence : presence.getLeaves()) {
			System.out.println("User ID: " + userPresence.getUserId() + " Username: " + userPresence.getUsername() + " Status: " + userPresence.getStatus());
		}
	}
};
Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func _ready():
	# First, setup the socket as explained in the authentication section.
	socket.received_stream_presence.connect(self._on_stream_presence)

func _on_stream_presence(p_presence : NakamaRTAPI.StreamPresenceEvent):
	print("Received presences on stream: %s" % [p_presence.stream])
	for p in p_presence.joins:
		print("User ID: %s, Username: %s, Status: %s" % [p.user_id, p.username, p.status])
	for p in p_presence.leaves:
		print("User ID: %s, Username: %s, Status: %s" % [p.user_id, p.username, p.status])
Client
1
2
3
4
local result = socket.on_streampresence(function(message)
		pprint(message.joins)
		pprint(message.leaves)
end)

Hidden presences do not generate presence events and won’t appear in results received by this event handler.

Join a stream #

The server can place users on any number of streams. To add a user to a stream the server needs the user’s ID, the unique session ID of the user’s current session, and information about the stream they should be placed on.

As an example we can register an RPC function that will place the user that calls it on a custom stream.

Server
1
2
3
4
5
6
7
local function join(context, _)
	local stream_id = { mode = 2, label = "Global Chat Room" }
	local hidden = false
	local persistence = false
	nk.stream_user_join(context.user_id, context.session_id, stream_id, hidden, persistence)
end
nk.register_rpc(join, "join")
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func JoinStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	mode := 2
	label := "Global Chat Room"
	hidden := false
	persistence := false
	if _, err := nk.StreamUserJoin(mode, "", "", label, userID, sessionID, hidden, persistence, ""); err != nil {
		return "", err
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("join", JoinStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
let joinFunction: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 2,
				label: 'Global Chat Room',
		};
		let hidden = false;
		let persistence = false;
		nk.streamUserJoin(ctx.userId, ctx.sessionId, streamId, hidden, persistence);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('join', joinFunction);

If this user+session is already a member of the stream the operation will be a no-op.

Leave a stream #

Leaving streams is also controlled by the server. To remove a user from a stream the server needs the user’s ID, the unique session ID of the user’s current session, and information about the stream they should be removed from.

As an example we can register an RPC function that will remove the user that calls it from the custom stream.

Server
1
2
3
4
5
local function leave(context, _)
	local stream_id = { mode = 2, label = "Global Chat Room" }
	nk.stream_user_leave(context.user_id, context.session_id, stream_id)
end
nk.register_rpc(leave, "leave")
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// RPC Code
func LeaveStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}
	label := "Global Chat Room"

	if err := nk.StreamUserLeave(2, "", "", label, userID, sessionID); err != nil {
		return "", err
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("leave", LeaveStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let leaveFunction: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 2,
				label: 'Global Chat Room',
		};
		nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('leave', leaveFunction);

If this user+session is not a member of the stream the operation will be a no-op.

Just like chat channels and real-time multiplayer matches when a client disconnects it is automatically removed from any streams it was part of.

Send data to a stream #

The server can send data to a stream through a function call. The message will be delivered to all users present on the stream. The message payload sent to a stream can be any string, but a structured format such as JSON is recommended.

Server
1
2
3
local stream_id = { mode = 2, label = "Global Chat Room" }
local payload = nk.json_encode({ some = "data" })
nk.stream_send(stream_id, payload)
Server
1
2
3
4
5
mode := uint8(2)
label := "Global Chat Room"
// Data does not have to be JSON, but it's a convenient format.
data := "{\"some\":\"data\"}"
nk.StreamSend(mode, "", "", label, data, nil)
Server
1
2
3
4
5
6
let streamId: nkruntime.Stream = {
	mode: 2,
	label: 'Global Chat Room',
};
const payload = {"some": "data"};
nk.streamSend(streamId, payload);

If the stream is empty the operation will be a no-op.

Close a stream #

Closing a stream removes all presences currently on it. It can be useful to explicitly close a stream and enable the server to reclaim resources more quickly.

Server
1
2
local stream_id = { mode = 2, label = "Global Chat Room" }
nk.stream_close(stream_id)
Server
1
2
3
mode := uint8(2)
label := "Global Chat Room"
nk.StreamClose(mode, "", "", label)
Server
1
2
3
4
5
let streamId: nkruntime.Stream = {
		mode: 2,
		label: 'Global Chat Room',
};
nk.streamClose(streamId);

Counting stream presences #

The server can peek at the presences on a stream to obtain a quick count without processing the full list of stream presences.

Server
1
2
local stream_id = { mode = 2, label = "Global Chat Room" }
local count = nk.stream_count(stream_id)
Server
1
2
3
4
5
6
mode := uint8(2)
label := "Global Chat Room"
count, err := nk.StreamCount(mode, "", "", label)
if err != nil {
	// Handle error here.
}
Server
1
2
3
4
5
let streamId: nkruntime.Stream = {
		mode: 2,
		label: 'Global Chat Room',
};
let count = nk.streamCount(streamId);

Listing stream presences #

A list of stream presence contains every user currently online and connected to that stream, along with information about the session ID they are connected through and additional metadata.

Server
1
2
3
4
5
6
local stream_id = { mode = 2, label = "Global Chat Room" }
local presences = nk.stream_user_list(stream_id)

for _, presence in ipairs(presences) do
	nk.logger_info("Found user ID " .. presence.user_id)
end
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
mode := uint8(2)
label := "Global Chat Room"
includeHidden := true
includeNotHidden := true

members, err := nk.StreamUserList(mode, "", "", label, includeHidden, includeNotHidden)
if err != nil {
	// Handle error here
}

for _, m := range members {
	logger.Info("Found user: %s\n", m.GetUserId())
}
Server
1
2
3
4
5
6
7
8
let streamId: nkruntime.Stream = {
		mode: 2,
		label: 'Global Chat Room',
};
let presences = nk.streamUserList(streamId);
presences?.forEach(function (p) {
		logger.info('Found user: %s\n', streamId);
});

Check a stream presence #

If only a single user is needed the server can check if that user is present on a stream and retrieve their presence and metadata.

As an example we can register an RPC function that will check if the user that calls it is active on a custom stream.

Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
local function check(context, _)
	local stream_id = { mode = 2, label = "Global Chat Room" }
	local meta = nk.stream_user_get(context.user_id, context.session_id, stream_id)

	-- Meta is nil if the user is not present on the stream.
	if (meta) then
		nk.logger_info("User found on stream!")
	end
end
nk.register_rpc(check, "check")
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func CheckStream(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	mode := uint8(2)
	label := "Global Chat Room"

	if metaPresence, err := nk.StreamUserGet(mode, "", "", label, userID, sessionID); err != nil {
		// Handle error.
	} else if metaPresence != nil {
		logger.Info("User found on stream")
	} else {
		logger.Info("User not found on stream")
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("check", CheckStream); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
Server
1
2
3
4
5
6
7
8
let streamId: nkruntime.Stream = {
		mode: 2,
		label: 'Global Chat Room',
};
let meta = nk.streamUserGet(ctx.userId, ctx.sessionId, streamId);
if (meta) {
		logger.info('User found on stream');
}

Built-in streams #

The server’s real-time features such as chat channels, multiplayer, parties, and notifications are built on top of streams.

By understanding the structure of these streams the code runtime can authoritatively change any of these features.

Stream mode values must be in the range 0-255, but note that the values used by these built-in streams are reserved and cannot be used by custom streams.
StreamModeSubjectSubcontextLabelInformation
Notifications0User ID--Controls the delivery of in-app notifications to connected users.
Status1User ID--Controls the status feature and broadcasting updates to friends.
Chat Channel2--“channel name”Membership to a chat channel.
Group Chat3Group ID--A group’s private chat channel.
Direct Message4User IDUser ID-A private direct message conversation between two users.
Relayed Match5Match ID--Membership and message routing for a relayed real-time multiplayer match.
Authoritative Match6Match ID-“nakama node name”Membership and message routing for an authoritative real-time multiplayer match.
Party7Party ID--Membership and message routing for a party.

Using these stream identifiers with the functions described above allows full control over the internal behavior of these features.

Example: Kick a user from a chat channel #

This code removes a user from a chat channel. If the user has more than one session connected to the channel only the specified one will be removed.

Server
1
2
3
4
local stream_id = { mode = 2, label = "Global Chat Room" }
local user_id = "4c2ae592-b2a7-445e-98ec-697694478b1c"
local session_id = "bf4a68f6-5279-437d-9663-e1543b6d995c"
nk.stream_user_leave(user_id, session_id, stream_id)
Server
1
2
3
4
5
6
7
8
mode := uint8(2)
label := "Global Chat Room"
userID := "4c2ae592-b2a7-445e-98ec-697694478b1c"
sessionID := "bf4a68f6-5279-437d-9663-e1543b6d995c"

if err := nk.StreamUserLeave(mode, "", "", label, userID, sessionID); err != nil {
	// Handle error.
}
Server
1
2
3
4
5
let streamId: nkruntime.Stream = {
		mode: 2,
		label: 'Global Chat Room',
};
nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);

Example: Stop receiving notifications #

By calling this RPC function a user can “silence” their notifications. Even if they remain online they will no longer receive real-time delivery of any in-app notifications.

Server
1
2
3
4
5
local function enable_silent_mode(context, _)
	local stream_id = { mode = 0, subject = context.user_id }
	nk.stream_user_leave(context.user_id, context.session_id, stream_id)
end
nk.register_rpc(enable_silent_mode, "enable_silent_mode")
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func EnableSilentMode(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	userID, ok := ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)
	if !ok {
		// If user ID is not found, RPC was called without a session token.
		return "", errors.New("Invalid context")
	}
	sessionID, ok := ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)
	if !ok {
		// If session ID is not found, RPC was not called over a connected socket.
		return "", errors.New("Invalid context")
	}

	if err := nk.StreamUserLeave(0, userId, "", "", userID, sessionID); err != nil {
		// Handle error.
	}

	return "Success", nil
}

// Register as RPC function, this call should be in InitModule.
if err := initializer.RegisterRpc("enable_silent_mode", EnableSilentMode); err != nil {
	logger.Error("Unable to register: %v", err)
	return err
}
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let enableSilentModeFn: nkruntime.RpcFunction = function(ctx: nkruntime.Context, logger: nkruntime.Logger, nk: nkruntime.Nakama, payload: string) {
		let streamId: nkruntime.Stream = {
				mode: 0,
				label: '4c2ae592-b2a7-445e-98ec-697694478b1c',
		};
		nk.streamUserLeave(ctx.userId, ctx.sessionId, streamId);
}

// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('enable_silent_mode', enableSilentModeFn);