How to Use Satori & Databricks to Predict and Prevent Player Churn #

Data-driven decision-making is the key to modern game development. Understanding player behavior, predicting churn, and optimizing monetization can significantly impact your game’s success.

By integrating Satori with Databricks, you can harness real-time analytics and machine learning to improve player retention, maximize lifetime value (LTV), and personalize player experiences. This guide will walk you through how Satori’s event data can be ingested into Databricks, how ML predictions can be generated, and how to use those insights to enhance your game.

We will cover following topics in this guide:
  • How to export game analytics from Satori to Databricks
  • How Databricks processes and predicts key player behaviors
  • How to send predictive insights back to Satori
  • How to apply ML-driven insights to improve retention and engagement

Export Satori Events to Databricks #

The first step is to set up your Satori to export events to Databricks. To enable this integration, from your Satori console, go to Settings page and select Data Lakes tab. In this page, you will find Databricks (S3) section as shown in the following screenshot.

Databricks Integration Setup

Details for each field given in the below table.

FeildDescription
Access Key IDThe AWS IAM access key ID of the user with permission to write to the bucket.
Secret Access KeyThe secret access key associated with the access key ID.
RegionThe AWS region your bucket is located in. A full list of regions is available here.
BucketThe name of the bucket to write data to.
Real-timeUploads files as frequently as once per second for real-time processing.
Flush Interval (minutes)The interval, in minutes, at which the file is uploaded or finalized in S3.
Max number of rows per Parquet Row GroupThis setting specifies the maximum number of rows allowed in each Parquet row group.

Real-time and Flush Interval fields work as alternatives to each other. If you select the Real-time option, Flush Interval will automatically be disabled because the Real-time setting sets the internal time to 1 seconds.

When you complete your setup and press the Save button, your configuration will be tested by inserting a dummy file in S3, which will then be removed immediately after. You will see that your connection is working with information on your Databricks section as shown in the following screenshot

Active Databricks Integration

At this point you will start receiving Satori events exported to your AWS bucket in Parquet files.

In order to be able to receive the Satori events, a Satori nonhuman account needs to be granted put access to the AWS S3 bucket. At this point, you can create an External Data connection in Databricks.

Databricks External Data Connection

Once the External Data connection is created, you should see the Satori events start showing up in Databricks immediately. Refresh the page again to see more continue to show up.

Using Spark and StructField() you can define the schema of the fields present in the events. Then, using Databricks Autoloader, you can create the streaming DataFrame that will be processed, set any processing and transformation steps (such as mapping specific fields to a DataFrame column or dropping a field altogether if it is not useful), and write the stream to the Delta table.

As an example, subschemas can be separately defined, later to be added to the overall schema object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
event_properties_schema = MapType(StringType(), StringType())
geo_location_schema = StructType([
        StructField("country_code", StringType(), True),
        StructField("region", StringType(), True),
        StructField("sub_region", StringType(), True),
        StructField("state", StringType(), True),
        StructField("city", StringType(), True)
])
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_timestamp", LongType(), True),
    # Array of variable length with alternating tuple properties
    StructField("event_properties", event_properties_schema, True),

])

The resulting PySpark DataFrame should have the resulting schema:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
event_id:string
event_name:string
event_timestamp:string
event_properties:map
	Key:string
	value:string
event_receive_time:string
user_id:string
user_session_id:string
user_session_issued_at:string
user_session_count:long
user_properties_custom:map
	Key:string
	value:string
user_audiences:array
	element:struct
		Id:string
		Name:string
source:string
client_api_key:string
client_platform:string
client_version:string
client_build_number:long
country_code:string
region:string
sub_region:string
state:string
city:string
user_audience_names:array
	element:string
processing_time:timestamp

And look similar to following table:

Events Schema & Table

Run ML Predictions in Databricks #

As part of the cleaning process, malformed events that don’t have an event timestamp or don’t conform to an expected schema should be excluded from the ML pipeline.

Using Spark, malformed events can be flagged and sent to a separate table for further review, or excluded from the processing pipeline altogether using the code snippet below. Below are some examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
File_path = {location of files}
streaming_df = (
    spark.readStream
    .format("cloudFiles")   # for autoloader

    .option("ignoreCorruptFiles", "true") # ignore corrupt files
    .schema(schema) # apply schema
    .load(file_path)
)


processed_df = (
    Streaming_df

    .filter(F.col('event_timestamp') <= F.current_timestamp()) # exclude “future” events that may be malformed because they do not conform to expectations
)

For the purposes of the demo pipeline, a “churned player” is considered as someone that has not played the game for a predetermined number of consecutive days. Further testing would be needed to refine this criteria. Players can be also grouped into cohorts by geo-location, audience, and sign-up date for more granular and detailed predictions.

Send ML Predictions to Satori #

Once we have completed the prediction for any player in Satori, we can write this value back to Satori to use it in our Live Operations.

First, we need to create a custom property. Open Properties tab under the Taxonomy page. This page lists the custom and default property definitions. Click the Create New Custom Property button on the top right and fill the form. In this example, let’s assume that we have a churn prediction and create a property named dbChurnPrediction.

Custom Property Definition in Taxonomy

We are now ready to update the player identities. Satori has a wide range of APIs that can help you to read and write data to Satori using Console API. In this guide, we will use the POST/v1/console/identity/{id} endpoint to update an identity. We will replace {id} with a real player identity id in Satori and send the following request body to update our custom property.

1
2
3
4
5
6
7
{
    "properties": {
        "custom": {
            "dbChurnPrediction": "PredictionResult"
        }
    }
}

Here is the code snippet to write prediction result in different languages:

Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
curl --location 'https://your-game-satori-url.io/v1/console/identity/20421bc3-e1ea-4c52-ad99-6e7dbf0c502a' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic OTQ2MmY5MDgtMDY1OC00YTdhLWFkMmQtMTVmNmEwM2FlMjlkOg==' \
--data '{
    "properties": {
        "custom": {
            "dbChurnPrediction": "PredictionResult"
        }
    }
}'
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"os"
)

func main() {
	// Base API endpoint for identity-related operations in Satori
	baseURL := "https://your-game-satori-url.io/v1/console/identity" // Replace with your actual values

	// Unique identifier representing a specific game identity
	identityID := "20421bc3-e1ea-4c52-ad99-6e7dbf0c502a"

	// Server key used for authentication (must be kept secure)
	serverKey := "69c8aeba-e76f-4afe-b7a9-31aec885ec70" // Replace with your actual values

	// Construct the complete request URL by appending the identity ID
	url := fmt.Sprintf("%s/%s", baseURL, identityID)

	// Define the request payload in JSON format
	// This updates a custom property (`dbChurnPrediction`) under the `properties` field
	requestBody := map[string]interface{}{
    	"properties": map[string]interface{}{
        	"custom": map[string]string{
            	"dbChurnPrediction": "PredictionResult",
        	},
    	},
	}

	// Convert the request payload into JSON format
	jsonData, err := json.Marshal(requestBody)
	if err != nil {
    	fmt.Println("Error marshalling JSON:", err)
    	os.Exit(1)
	}

	// Create a new HTTP POST request with the JSON payload
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
	if err != nil {
    	fmt.Println("Error creating request:", err)
    	os.Exit(1)
	}

	// Set request headers:
	// - `Content-Type`: Specifies that the request body is in JSON format
	// - `Authorization`: Uses Basic Authentication with the server key
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "Basic "+serverKey)

	// Initialize an HTTP client to send the request
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
    	fmt.Println("Error making request:", err)
    	os.Exit(1)
	}
	defer resp.Body.Close() // Ensure response body is closed after reading

	// Print the HTTP response status (e.g., 200 OK, 400 Bad Request)
	fmt.Println("Response Status:", resp.Status)
}
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import fetch from 'node-fetch';

async function sendRequest() {
	// Base API endpoint for identity-related operations in Satori
	const baseURL = "https://your-game-satori-url.io/v1/console/identity"; // Replace with your actual values

	// Unique identifier representing a specific game identity
	const identityID = "20421bc3-e1ea-4c52-ad99-6e7dbf0c502a";

	// Server key used for authentication (must be kept secure)
	const serverKey = "69c8aeba-e76f-4afe-b7a9-31aec885ec70"; // Replace with your actual values

	// Construct the complete request URL by appending the identity ID
	const url = `${baseURL}/${identityID}`;

	// Define the request payload in JSON format
	// This updates a custom property (`dbChurnPrediction`) under the `properties` field
	const requestBody = {
    	properties: {
        	custom: {
            	dbChurnPrediction: "PredictionResult"
        	}
    	}
	};

	try {
    	// Make the HTTP POST request with the JSON payload
    	const response = await fetch(url, {
        	method: "POST",
        	headers: {
            	"Content-Type": "application/json",
            	"Authorization": `Basic ${serverKey}`
        	},
        	body: JSON.stringify(requestBody)
    	});

    	// Print the HTTP response status (e.g., 200 OK, 400 Bad Request)
    	console.log("Response Status:", response.status);

    	// Optionally, read the response body as JSON
    	const responseData = await response.json();
    	console.log("Response Data:", responseData);

	} catch (error) {
    	console.error("Error making request:", error);
	}
}

// Execute the function
sendRequest();
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
local http = require("socket.http")
local ltn12 = require("ltn12")
local json = require("dkjson") -- Requires dkjson library for JSON encoding/decoding

-- Base API endpoint for identity-related operations in Satori
local baseURL = "https://your-game-satori-url.io/v1/console/identity" -- Replace with your actual values

-- Unique identifier representing a specific game identity
local identityID = "20421bc3-e1ea-4c52-ad99-6e7dbf0c502a"

-- Server key used for authentication (must be kept secure)
local serverKey = "69c8aeba-e76f-4afe-b7a9-31aec885ec70" -- Replace with your actual values

-- Construct the complete request URL by appending the identity ID
local url = baseURL .. "/" .. identityID

-- Define the request payload in JSON format
-- This updates a custom property (`dbChurnPrediction`) under the `properties` field
local requestBody = {
	properties = {
    	custom = {
        	dbChurnPrediction = "PredictionResult"
    	}
	}
}

-- Convert the request payload into JSON format
local requestBodyJson = json.encode(requestBody)

-- Prepare the HTTP request headers
local headers = {
	["Content-Type"] = "application/json",
	["Authorization"] = "Basic " .. serverKey,
	["Content-Length"] = tostring(#requestBodyJson)
}

-- Response storage
local responseBody = {}

-- Make the HTTP POST request
local response, status, responseHeaders = http.request{
	url = url,
	method = "POST",
	headers = headers,
	source = ltn12.source.string(requestBodyJson),
	sink = ltn12.sink.table(responseBody)
}

-- Print the HTTP response status (e.g., 200 OK, 400 Bad Request)
print("Response Status:", status)

-- Optionally, decode and print the response data if it is JSON
if responseBody and #responseBody > 0 then
	local responseJson, pos, err = json.decode(table.concat(responseBody))
	if responseJson then
    	print("Response Data:", json.encode(responseJson, {indent=true}))
	else
    	print("Error decoding JSON response:", err)
	end
end
Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import requests
import json

# Base API endpoint for identity-related operations in Satori
base_url = "https://your-game-satori-url.io/v1/console/identity"  # Replace with your actual values

# Unique identifier representing a specific game identity
identity_id = "20421bc3-e1ea-4c52-ad99-6e7dbf0c502a"

# Server key used for authentication (must be kept secure)
server_key = "69c8aeba-e76f-4afe-b7a9-31aec885ec70"  # Replace with your actual values

# Construct the complete request URL by appending the identity ID
url = f"{base_url}/{identity_id}"

# Define the request payload in JSON format
# This updates a custom property (`dbChurnPrediction`) under the `properties` field
request_body = {
	"properties": {
    	"custom": {
        	"dbChurnPrediction": "PredictionResult"
    	}
	}
}

try:
	# Make the HTTP POST request with the JSON payload
	response = requests.post(
    	url,
    	headers={
        	"Content-Type": "application/json",
        	"Authorization": f"Basic {server_key}"
    	},
    	json=request_body
	)

	# Print the HTTP response status (e.g., 200 OK, 400 Bad Request)
	print("Response Status:", response.status_code)

	# Optionally, print the response JSON if available
	if response.content:
    	try:
        	response_data = response.json()
        	print("Response Data:", json.dumps(response_data, indent=4))
    	except json.JSONDecodeError:
        	print("Response is not in JSON format:", response.text)

except requests.RequestException as e:
	print("Error making request:", e)

For authentication, you can use basic authentication with your server key as the username. You can find your key in the Server Keys tab of the Settings page. Feel free to create a new one if you want but please note that you will need to give Write permission to Identities for your server to update an identity.

Optimize Your Game with ML Insights #

In the previous section, we have enriched our player identities with Churn Prediction information. Now let’s use it in action to improve our game and work on two examples together:

  1. Modify a feature flag to show a different set of ads for high churn predicted players to reduce the churn.
  2. Create an experiment with different economy definitions for low churn predicted players to improve their LTV.

Preparation by Creating Audiences #

We will start with creating some audiences. Satori has very capable filter rules to create audiences. Those filters even let us cast types such as converting strings to floats. For instance, in this example, we created the dbChurnPrediction property as a string property. However when we check our identities, we can easily understand that they are float. Following audience filter will cast our property to a float and compare if it is below 0.4.

float(PropertiesCustom("dbChurnPrediction", "2")) < 0.4

We can use this filter to create our Churn-Prediction-Low audience which has players with lower churn prediction as shown below.

New Audience: Low Churn Prediction

Using a similar filter, we also created a Churn-Prediction-High audience to see the player predicted to be 70% or higher likely to churn.

New Audience: High Churn Prediction

Different Ad Frequency for High Churn Predicted Players #

From our previous experience in a same-genre game, we know that players are less likely to churn and more likely to attach to the game if they see less interstitial ads and are offered more boosters for an easier progress. Our game has a feature flag defined in Satori to manage the frequency of interstitial and rewarded video ads shown to the players. Default configuration of this feature flag is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
   "interstitial": {
      "resetAfterRV": true,
      "interval": 120,
      "firstInterval": 240
   },
   "boosterRV": {
      "probability": 0.25,
      "interval": 300
   }
}

Using Feature Flag Variants, only for Churn-Prediction-High audience, we can change this configuration to make the game less interrupting and easy to progress by increasing the interstitial interval and booster probability as shown in the following screenshot from Audiences.

New Feature Flag Variant

As Heroic Labs, we strongly recommend to always keep an eye on the important metrics via Satori’s dashboards and metrics or via your own dashboard you can create using Satori’s data lake exports when you make any changes on the system. And especially for the changes you are not sure about their effect on your game, we always recommend experimenting with them first.

Experiment on Economy Definitions for Low Churn Predicted Players #

For our next example, the game designers shared their hypothesis with us. They think that those players who are less likely to churn are more likely to spend their hard currencies if they find better deals. They also added in their hypothesis that this will result in more currency sinking and more revenue generation.

To test this hypothesis, we will set up a new experiment. In this experiment, we will use the Hiro-Economy feature flag because our game is using Hiro for an easier and more structured way of metagame implementation.

As its default configuration, this feature flag provides two packs:

  1. Spend 30 gems to get 300 coins
  2. Spend 50 gems to get 600 coins

With our experiment we want to make those two packs modified as:

  1. Spend 40 gems to get 400 coins
  2. Spend 80 gems to get 1000 coins

Now let’s set our experiment up. We will first enter the experiment details.

New Experiment: Details

Next, we will select the metrics we want to follow. For this experiment, our goal metric will be revenue because our hypothesis is aiming for more revenue. And we will also check for some other metrics to monitor.

New Experiment: Metrics

After metrics, we will select our target audience: Churn-Probability-Low

New Experiment: Audience

For this experiment, we will have two variants. One is baseline which is unchanged, the other one is Variant A - the new store configuration we want to test.

New Experiment: Variants
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Baseline
{
 "store_items": {
   "small_coin_pack": {
     "name": "Small Coin Pack",
     "description": "",
     "category": "Store Resources",
     "cost": {
       "currencies": {
         "gem": 30
       }
     },
     "reward": {
       "guaranteed": {
         "currencies": {
           "coins": {
             "min": 300,
             "max": 300
           }
         }
       }
     }
   },
   "large_coin_pack": {
     "name": "Large Coin Pack",
     "description": "20% More!",
     "category": "Store Resources",
     "cost": {
       "currencies": {
         "gem": 50
       }
     },
     "reward": {
       "guaranteed": {
         "currencies": {
           "coins": {
             "min": 600,
             "max": 600
           }
         }
       }
     }
   }
 }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Variant A
{
 "store_items": {
   "small_coin_pack": {
     "name": "Small Coin Pack",
     "description": "",
     "category": "Store Resources",
     "cost": {
       "currencies": {
         "gem": 40
       }
     },
     "reward": {
       "guaranteed": {
         "currencies": {
           "coins": {
             "min": 400,
             "max": 400
           }
         }
       }
     }
   },
   "large_coin_pack": {
     "name": "Large Coin Pack",
     "description": "25% More!",
     "category": "Store Resources",
     "cost": {
       "currencies": {
         "gem": 80
       }
     },
     "reward": {
       "guaranteed": {
         "currencies": {
           "coins": {
             "min": 1000,
             "max": 1000
           }
         }
       }
     }
   }
 }
}

We also want those variants to have a similar number of players (50% each) during the experiment.

New Experiment: Participants

And we complete our experiment setup by setting the start and end time of the experiment phase.

Now, it is time for us to check the metrics and see if our hypothesis to improve the game is correct or not.