Streams are a powerful feature that gives you full control over Nakama’s internal real-time routing and delivery.
Nakama’s real-time message routing and delivery subsystem is organized into streams. Streams tie together clients interested in certain message types and allow Nakama’s internal components to deliver messages to relevant users.
Clients may receive messages and data from streams, but are not allowed to directly join, leave, or send data themselves. These functions are only available in the server code runtime.
All of the higher-level real-time features in the server (chat channels, multiplayer, notifications, etc.) are built as features on top of streams. Understanding and using streams are not necessary to use these features.
Streams are a way to address a set of online users and deliver messages to them. Each stream maintains a list of presences that uniquely identify a user with the socket they’re connected on. When the server sends a message to a stream it will be delivered to all clients identified by the presences.
Presences may be marked with an optional persistence flag. The server can observe this flag when handling messages delivered to a stream to decide whether the message data should be stored in the database. The real-time chat feature uses this flag to decide if messages should be stored so clients can request message history.
Streams generate presence events that notify all users currently in the stream when a new user joins or an existing user leaves the stream. Presences may be marked with an optional hidden flag. When this is set to true the server will not generate presence events when this user joins or leaves.
Hidden presences are still full stream members so they do receive data and presence events as normal.
Clients can register an event handler to consume stream data objects when received over the socket. The handler function will be called once for each stream data message.
Client
1
2
3
4
socket.onstreamdata=(streamdata)=>{console.log("Received data from stream: %o",streamdata.stream.subject);console.log("Data content: %@.",streamdata.data);};
Client
1
2
3
4
5
socket.ReceivedStreamState+=stream=>{Console.WriteLine("Received data from stream: '{0}'",stream.Stream.Subject);Console.WriteLine("Data content: {0}",stream.State);};
Client
1
2
3
4
5
socket.ReceivedStreamState+=stream=>{Debug.LogFormat("Received data from stream: '{0}'",stream.Stream.Subject);Debug.LogFormat("Data content: {0}",stream.State);};
Client
1
2
3
4
5
6
7
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);listener.onStreamData([](constNStreamData&data){CCLOG("Received data from stream: %s",data.stream.subject.c_str());CCLOG("Data content: %s",data.data.c_str());});
Client
1
2
3
4
socket.onstreamdata=function(streamdata){cc.log("Received data from stream:",streamdata.stream.subject);cc.log("Data content:",streamdata.data);};
Client
1
2
3
4
5
6
7
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);listener.onStreamData([](constNStreamData&data){cout<<"Received data from stream: "<<data.stream.subject<<endl;cout<<"Data content: "<<data.data<<endl;});
Client
1
2
3
4
5
6
7
SocketListenerlistener=newAbstractSocketListener(){@OverridepublicvoidonStreamData(finalStreamDatadata){System.out.println("Received data from stream: "+data.getStream().getSubject());System.out.println("Data content: "+data.getData());}};
Client
1
2
3
4
5
6
7
func_ready():# First, setup the socket as explained in the authentication section.socket.connect("received_stream_state",self,"_on_stream_state")func_on_stream_state(p_state:NakamaRTAPI.StreamData):print("Received data from stream: %s"%[p_state.stream])print("Data: %s"%[parse_json(p_state.state)])
Client
1
2
3
4
5
6
7
func_ready():# First, setup the socket as explained in the authentication section.socket.received_stream_state.connect(self._on_stream_state)func_on_stream_state(p_state:NakamaRTAPI.StreamData):print("Received data from stream: %s"%[p_state.stream])print("Data: %s"%[JSON.parse_string(p_state.state)])
When a new presence joins a stream or an existing presence leaves the server will broadcast presence events to all users currently on the stream.
Client
1
2
3
4
5
6
7
8
9
socket.onstreampresence=(streampresence)=>{console.log("Received presence event for stream: %o",streampresence.id);streampresence.joins.forEach((join)=>{console.log("New user joined: %o",join.user_id);});streampresence.leaves.forEach((leave)=>{console.log("User left: %o",leave.user_id);});};
Client
1
2
3
4
5
6
7
8
9
10
11
12
socket.ReceivedStreamPresence+=presenceEvent=>{Console.WriteLine("Received data from stream: '{0}'",presenceEvent.Stream.Subject);foreach(varjoinedinpresenceEvent.Joins){Console.WriteLine("Joined: {0}",joined);}foreach(varleftinpresenceEvent.Leaves){Console.WriteLine("Left: {0}",left);}};
Client
1
2
3
4
5
6
7
8
9
10
11
12
socket.ReceivedStreamPresence+=presenceEvent=>{Debug.LogFormat("Received data from stream: '{0}'",presenceEvent.Stream.Subject);foreach(varjoinedinpresenceEvent.Joins){Debug.LogFormat("Joined: {0}",joined);}foreach(varleftinpresenceEvent.Leaves){Debug.LogFormat("Left: {0}",left);}};
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);listener.onStreamPresence([](constNStreamPresenceEvent&presence){CCLOG("Received presence event for stream: %s",presence.stream.subject.c_str());for(constNUserPresence&userPresence:presence.joins){CCLOG("New user joined: %s",userPresence.user_id.c_str());}for(constNUserPresence&userPresence:presence.leaves){CCLOG("User left: %s",userPresence.user_id.c_str());}});
Client
1
2
3
4
5
6
7
8
9
socket.onstreampresence=function(streampresence){cc.log("Received presence event for stream:",streampresence.id);streampresence.joins.forEach(function(join){cc.log("New user joined:",join.user_id);});streampresence.leaves.forEach(function(leave){cc.log("User left:",leave.user_id);});};
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// add listener to header of your class: NRtDefaultClientListener listener;
rtClient->setListener(&listener);listener.onStreamPresence([](constNStreamPresenceEvent&presence){cout<<"Received presence event for stream: "<<presence.stream.subject<<endl;for(constNUserPresence&userPresence:presence.joins){cout<<"New user joined: "<<userPresence.user_id<<endl;}for(constNUserPresence&userPresence:presence.leaves){cout<<"User left: "<<userPresence.user_id<<endl;}});
The server can place users on any number of streams. To add a user to a stream the server needs the user’s ID, the unique session ID of the user’s current session, and information about the stream they should be placed on.
As an example we can register an RPC function that will place the user that calls it on a custom stream.
funcJoinStream(ctxcontext.Context,loggerruntime.Logger,db*sql.DB,nkruntime.NakamaModule,payloadstring)(string,error){userID,ok:=ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)if!ok{// If user ID is not found, RPC was called without a session token.
return"",errors.New("Invalid context")}sessionID,ok:=ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)if!ok{// If session ID is not found, RPC was not called over a connected socket.
return"",errors.New("Invalid context")}mode:=2label:="Global Chat Room"hidden:=falsepersistence:=falseif_,err:=nk.StreamUserJoin(mode,"","",label,userID,sessionID,hidden,persistence,"");err!=nil{return"",err}return"Success",nil}// Register as RPC function, this call should be in InitModule.
iferr:=initializer.RegisterRpc("join",JoinStream);err!=nil{logger.Error("Unable to register: %v",err)returnerr}
Server
1
2
3
4
5
6
7
8
9
10
11
12
letjoinFunction: nkruntime.RpcFunction=function(ctx: nkruntime.Context,logger: nkruntime.Logger,nk: nkruntime.Nakama,payload: string){letstreamId: nkruntime.Stream={mode: 2,label:'Global Chat Room',};lethidden=false;letpersistence=false;nk.streamUserJoin(ctx.userId,ctx.sessionId,streamId,hidden,persistence);}// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('join',joinFunction);
If this user+session is already a member of the stream the operation will be a no-op.
Leaving streams is also controlled by the server. To remove a user from a stream the server needs the user’s ID, the unique session ID of the user’s current session, and information about the stream they should be removed from.
As an example we can register an RPC function that will remove the user that calls it from the custom stream.
// RPC Code
funcLeaveStream(ctxcontext.Context,loggerruntime.Logger,db*sql.DB,nkruntime.NakamaModule,payloadstring)(string,error){userID,ok:=ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)if!ok{// If user ID is not found, RPC was called without a session token.
return"",errors.New("Invalid context")}sessionID,ok:=ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)if!ok{// If session ID is not found, RPC was not called over a connected socket.
return"",errors.New("Invalid context")}label:="Global Chat Room"iferr:=nk.StreamUserLeave(2,"","",label,userID,sessionID);err!=nil{return"",err}return"Success",nil}// Register as RPC function, this call should be in InitModule.
iferr:=initializer.RegisterRpc("leave",LeaveStream);err!=nil{logger.Error("Unable to register: %v",err)returnerr}
Server
1
2
3
4
5
6
7
8
9
10
letleaveFunction: nkruntime.RpcFunction=function(ctx: nkruntime.Context,logger: nkruntime.Logger,nk: nkruntime.Nakama,payload: string){letstreamId: nkruntime.Stream={mode: 2,label:'Global Chat Room',};nk.streamUserLeave(ctx.userId,ctx.sessionId,streamId);}// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('leave',leaveFunction);
If this user+session is not a member of the stream the operation will be a no-op.
The server can send data to a stream through a function call. The message will be delivered to all users present on the stream. The message payload sent to a stream can be any string, but a structured format such as JSON is recommended.
mode:=uint8(2)label:="Global Chat Room"// Data does not have to be JSON, but it's a convenient format.
data:="{\"some\":\"data\"}"nk.StreamSend(mode,"","",label,data,nil)
Closing a stream removes all presences currently on it. It can be useful to explicitly close a stream and enable the server to reclaim resources more quickly.
A list of stream presence contains every user currently online and connected to that stream, along with information about the session ID they are connected through and additional metadata.
Server
1
2
3
4
5
6
localstream_id={mode=2,label="Global Chat Room"}localpresences=nk.stream_user_list(stream_id)for_,presenceinipairs(presences)donk.logger_info("Found user ID "..presence.user_id)end
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
mode:=uint8(2)label:="Global Chat Room"includeHidden:=trueincludeNotHidden:=truemembers,err:=nk.StreamUserList(mode,"","",label,includeHidden,includeNotHidden)iferr!=nil{// Handle error here
}for_,m:=rangemembers{logger.Info("Found user: %s\n",m.GetUserId())}
If only a single user is needed the server can check if that user is present on a stream and retrieve their presence and metadata.
As an example we can register an RPC function that will check if the user that calls it is active on a custom stream.
Server
1
2
3
4
5
6
7
8
9
10
localfunctioncheck(context,_)localstream_id={mode=2,label="Global Chat Room"}localmeta=nk.stream_user_get(context.user_id,context.session_id,stream_id)-- Meta is nil if the user is not present on the stream.if(meta)thennk.logger_info("User found on stream!")endendnk.register_rpc(check,"check")
funcCheckStream(ctxcontext.Context,loggerruntime.Logger,db*sql.DB,nkruntime.NakamaModule,payloadstring)(string,error){userID,ok:=ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)if!ok{// If user ID is not found, RPC was called without a session token.
return"",errors.New("Invalid context")}sessionID,ok:=ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)if!ok{// If session ID is not found, RPC was not called over a connected socket.
return"",errors.New("Invalid context")}mode:=uint8(2)label:="Global Chat Room"ifmetaPresence,err:=nk.StreamUserGet(mode,"","",label,userID,sessionID);err!=nil{// Handle error.
}elseifmetaPresence!=nil{logger.Info("User found on stream")}else{logger.Info("User not found on stream")}return"Success",nil}// Register as RPC function, this call should be in InitModule.
iferr:=initializer.RegisterRpc("check",CheckStream);err!=nil{logger.Error("Unable to register: %v",err)returnerr}
Server
1
2
3
4
5
6
7
8
letstreamId: nkruntime.Stream={mode: 2,label:'Global Chat Room',};letmeta=nk.streamUserGet(ctx.userId,ctx.sessionId,streamId);if(meta){logger.info('User found on stream');}
By understanding the structure of these streams the code runtime can authoritatively change any of these features.
Stream mode values must be in the range 0-255, but note that the values used by these built-in streams are reserved and cannot be used by custom streams.
Stream
Mode
Subject
Subcontext
Label
Information
Notifications
0
User ID
-
-
Controls the delivery of in-app notifications to connected users.
Status
1
User ID
-
-
Controls the status feature and broadcasting updates to friends.
Chat Channel
2
-
-
“channel name”
Membership to a chat channel.
Group Chat
3
Group ID
-
-
A group’s private chat channel.
Direct Message
4
User ID
User ID
-
A private direct message conversation between two users.
Relayed Match
5
Match ID
-
-
Membership and message routing for a relayed real-time multiplayer match.
Authoritative Match
6
Match ID
-
“nakama node name”
Membership and message routing for an authoritative real-time multiplayer match.
Party
7
Party ID
-
-
Membership and message routing for a party.
Using these stream identifiers with the functions described above allows full control over the internal behavior of these features.
By calling this RPC function a user can “silence” their notifications. Even if they remain online they will no longer receive real-time delivery of any in-app notifications.
funcEnableSilentMode(ctxcontext.Context,loggerruntime.Logger,db*sql.DB,nkruntime.NakamaModule,payloadstring)(string,error){userID,ok:=ctx.Value(runtime.RUNTIME_CTX_USER_ID).(string)if!ok{// If user ID is not found, RPC was called without a session token.
return"",errors.New("Invalid context")}sessionID,ok:=ctx.Value(runtime.RUNTIME_CTX_SESSION_ID).(string)if!ok{// If session ID is not found, RPC was not called over a connected socket.
return"",errors.New("Invalid context")}iferr:=nk.StreamUserLeave(0,userId,"","",userID,sessionID);err!=nil{// Handle error.
}return"Success",nil}// Register as RPC function, this call should be in InitModule.
iferr:=initializer.RegisterRpc("enable_silent_mode",EnableSilentMode);err!=nil{logger.Error("Unable to register: %v",err)returnerr}
Server
1
2
3
4
5
6
7
8
9
10
letenableSilentModeFn: nkruntime.RpcFunction=function(ctx: nkruntime.Context,logger: nkruntime.Logger,nk: nkruntime.Nakama,payload: string){letstreamId: nkruntime.Stream={mode: 0,label:'4c2ae592-b2a7-445e-98ec-697694478b1c',};nk.streamUserLeave(ctx.userId,ctx.sessionId,streamId);}// Register as RPC function, this call should be in InitModule.
initializer.registerRpc('enable_silent_mode',enableSilentModeFn);