Skip to content

Logo

Data drifting


Core Module

Data drifting is one of the core reasons why model accuracy degrades over time in production. For machine learning models, data drift is the change in model input data that leads to model performance degradation. In practical terms, this means that the model is receiving input that is outside the scope that it was trained on, as seen in the figure below. This shows that the underlying distribution of a particular feature has slowly been increasing in value over two years

Image

Image credit

In some cases, it may be that if you normalize some feature in a better way that you are able to generalize your model better, but this is not always the case. The reason for such a drift is commonly some external factor that you essentially have no control over. That really only leaves you with one option: retrain your model on the newly received input features and deploy that model to production. This process is probably going to repeat over the lifetime of your application if you want to keep it up-to-date with the real world.

Image

Image credit

We have now come up with a solution to the data drift problem, but there is one important detail that we have not taken care of: When we should actually trigger the retraining? We do not want to wait around for our model performance to degrade, thus we need tools that can detect when we are seeing a drift in our data.

❔ Exercises

For these exercises we are going to use the framework Evidently developed by EvidentlyAI. Evidently currently supports both detection for both regression and classification models. The exercises are in large taken from here and in general we recommend if you are in doubt about an exercise to look at the docs for API and examples (their documentation can be a bit lacking sometimes, so you may also have to dive into the source code).

Additionally, we want to stress that data drift detection, concept drift detection etc. is still an active field of research and therefore exist multiple frameworks for doing this kind of detection. In addition to Evidently, we can also mention NannyML, WhyLogs and deepcheck.

  1. Start by installing Evidently

    pip install evidently
    

    You will also need scikit-learn and pandas installed if you do not already have it.

  2. Hopefully you have already gone through session S7 on deployment. As part of the deployment exercises about GCP functions you should have developed an application that can classify the iris dataset. Your solution should look something like the script below:

    Example GCP function for iris classification
    sklearn_train_function.py
    import pickle
    
    import functions_framework
    from google.cloud import storage
    
    BUCKET_NAME = "my_sklearn_model_bucket"
    MODEL_FILE = "model.pkl"
    
    client = storage.Client()
    bucket = client.get_bucket(BUCKET_NAME)
    blob = bucket.get_blob(MODEL_FILE)
    my_model = pickle.loads(blob.download_as_string())
    
    
    @functions_framework.http
    def knn_classifier(request):
        """Simple knn classifier function for iris prediction."""
        request_json = request.get_json()
        if request_json and "input_data" in request_json:
            input_data = request_json["input_data"]
            input_data = [float(in_data) for in_data in input_data]
            input_data = [input_data]
            prediction = my_model.predict(input_data)
            return {"prediction": prediction.tolist()}
        return {"error": "No input data provided."}
    

    Start by converting your GCP function into a FastAPI application. The appropriate curl command should look something like this:

    curl -X 'POST' \
        'http://127.0.0.1:8000/iris_v1/?sepal_length=1.0&sepal_width=1.0&petal_length=1.0&petal_width=1.0' \
        -H 'accept: application/json' \
        -d ''
    

    and the response body should look like this:

    { "prediction": "Iris-Setosa", "prediction_int": 0 }
    
    Solution
    iris_fastapi_solution.py
    import pickle
    from collections.abc import Generator
    
    from fastapi import FastAPI
    
    
    def lifespan(app: FastAPI) -> Generator[None]:
        """Load model and classes."""
        global model, classes
        classes = ["Iris-Setosa", "Iris-Versicolour", "Iris-Virginica"]
        with open("model.pkl", "rb") as file:
            model = pickle.load(file)
    
        yield
    
        del model, classes
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    @app.post("/predict")
    def iris_inference(sepal_length: float, sepal_width: float, petal_length: float, petal_width: float):
        """Version 1 of the iris inference endpoint."""
        prediction = model.predict([[sepal_length, sepal_width, petal_length, petal_width]])
        prediction = prediction.item()
        return {"prediction": classes[prediction], "prediction_int": prediction}
    
    1. Next we are going to add some functionality to our application. We need to add that the input for the user is saved to a database whenever our application is called. However, to not slow down the response to our user we want to implement this as a background task. A background task is a function that should be executed after the user have got their response. Implement a background task that save the user input to a database implemented as a simple .csv file. You can read more about background tasks here. The header of the database should look something like this:

      time, sepal_length, sepal_width, petal_length, petal_width, prediction
      2022-12-28 17:24:34.045649, 1.0, 1.0, 1.0, 1.0, 1
      2022-12-28 17:24:44.026432, 2.0, 2.0, 2.0, 2.0, 1
      ...
      

      Thus both input, timestamp and predicted value should be saved.

      Solution
      iris_fastapi_solution.py
      import pickle
      from collections.abc import Generator
      from datetime import datetime
      
      from fastapi import BackgroundTasks, FastAPI
      
      
      def lifespan(app: FastAPI) -> Generator[None]:
          """Load model and classes, and create database file."""
          global model, classes
          classes = ["Iris-Setosa", "Iris-Versicolour", "Iris-Virginica"]
          with open("model.pkl", "rb") as file:
              model = pickle.load(file)
      
          with open("prediction_database.csv", "w") as file:
              file.write("time, sepal_length, sepal_width, petal_length, petal_width, prediction\n")
      
          yield
      
          del model
      
      
      app = FastAPI(lifespan=lifespan)
      
      
      def add_to_database(
          now: str,
          sepal_length: float,
          sepal_width: float,
          petal_length: float,
          petal_width: float,
          prediction: int,
      ) -> None:
          """Simple function to add prediction to database."""
          with open("prediction_database.csv", "a") as file:
              file.write(f"{now}, {sepal_length}, {sepal_width}, {petal_length}, {petal_width}, {prediction}\n")
      
      
      @app.post("/predict")
      async def iris_inference(
          sepal_length: float,
          sepal_width: float,
          petal_length: float,
          petal_width: float,
          background_tasks: BackgroundTasks,
      ):
          """Version 2 of the iris inference endpoint."""
          prediction = model.predict([[sepal_length, sepal_width, petal_length, petal_width]])
          prediction = prediction.item()
      
          now = str(datetime.now(tz=datetime.UTC))
          background_tasks.add_task(add_to_database, now, sepal_length, sepal_width, petal_length, petal_width, prediction)
          return {"prediction": classes[prediction], "prediction_int": prediction}
      
    2. Call you API a number of times to generate some dummy data in the database.

  3. Create a new data_drift.py file where we are going to implement the data drifting detection and reporting. Start by adding both the real iris data and your generated dummy data as pandas data frames.

    import pandas as pd
    from sklearn import datasets
    reference_data = datasets.load_iris(as_frame=True).frame
    current_data = pd.read_csv('prediction_database.csv')
    

    If done correctly you will most likely end up with two data frames that look like

    # reference_data
    sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
    0                  5.1               3.5                1.4               0.2       0
    1                  4.9               3.0                1.4               0.2       0
    ...
    148                6.2               3.4                5.4               2.3       2
    149                5.9               3.0                5.1               1.8       2
    [150 rows x 5 columns]
    
    # current_data
    time                         sepal_length   sepal_width   petal_length   petal_width   prediction
    2022-12-28 17:24:34.045649   1.0            1.0            1.0           1.0           1
    ...
    2022-12-28 17:24:34.045649   1.0            1.0            1.0           1.0           1
    [10 rows x 5 columns]
    

    Standardize the data frames such that they have the same column names and drop the time column from the current_data data frame.

    Solution
    import pandas as pd
    from sklearn import datasets
    reference_data = datasets.load_iris(as_frame=True).frame
    reference_data = reference_data.rename(
        columns={
            'sepal length (cm)': 'sepal_length',
            'sepal width (cm)': 'sepal_width',
            'petal length (cm)': 'petal_length',
            'petal width (cm)': 'petal_width',
            'target': 'target'
        }
    )
    
    current_data = pd.read_csv('prediction_database.csv')
    current_data = current_data.drop(columns=['time'])
    
    1. Add the following code to the data_drift.py file to create a report on the data drift:

      from evidently.report import Report
      from evidently.metric_preset import DataDriftPreset
      report = Report(metrics=[DataDriftPreset()])
      report.run(reference_data=reference_data, current_data=current_data)
      report.save_html('report.html')
      

      Open the generated .html page. What does it say about your data? Have it drifted? Make sure to poke around to understand what the different plots are actually showing.

    2. Data drifting is not the only kind of reporting evidently can make. We can also get reports on the data quality. Look through the documentation of evidently and add the preset that has to do with data quality to the report. Try adding a few Nan values to your current_data and re-run the report. Checkout the report and go over the generated plots and make sure that it picked up on the missing values you just added.

      Solution

      The DataQualityPreset checks the quality of the data:

      from evidently.metric_preset import DataDriftPreset, DataQualityPreset
      report = Report(metrics=[DataDriftPreset(), DataQualityPreset()])
      
    3. Another important kind of drift is called target drift, where the distribution of the target values have changed. If your training data was balanced, and you are now seeing a lot of one class being predicted this may indicate that your model is not performing as expected or that external factors have changed, which means that you should retrain your model. Find the preset that checks for target drift, add it to the report and re-run the analysis.

      Solution

      The TargetDriftPreset checks the distribution of the target values:

      from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset
      report = Report(metrics=[DataDriftPreset(), DataQualityPreset(), TargetDriftPreset()])
      
  4. Evidently reports are meant for debugging, exploration and reporting of results. However, if we want to integrate evidently functionality into our already developed pipelines, either as a simple script, as part of a GitHub action workflow or something else, we need to be able to extract the results in a more programmatic way. This can be done using their Test and TestSuites classes. Implement a simple test that checks if there are any missing values in our dataset and print the results to the console.

    Solution

    Using the .as_dict() method on a TestSuite we can programmatically extract the results of the test. In particular the returned dictionary contains a key summary that contains a key all_passed that is True if all tests passed and False otherwise.

    from evidently.test_suite import TestSuite
    from evidently.tests import TestNumberOfMissingValues
    data_test = TestSuite(tests=[TestNumberOfMissingValues()])
    data_test.run(reference_data=reference_data, current_data=current_data)
    result = data_test.as_dict()
    print(result)
    print("All tests passed: ", result['summary']['all_passed'])
    
    1. Take a look at this colab notebook that contains all tests implemented in Evidently. Pick 5 tests of your choice, where at least 1 fails by default and implement them as a TestSuite. Then try changing the arguments of the test so they better fit your use case and get them all passing.
  5. (Optional) When doing monitoring in practice, we are not always interested in running on all data collected from our API maybe only the last N entries or maybe just from the last hour of observations. Since we are already logging the timestamps of when our API is called we can use that for filtering. Implement a simple filter that

    • Takes an integer n and returns the last n entries in our database
    • Takes an integer t that filters away observations older than t hours
    Solution
    import pandas as pd
    def filter_data(data: pd.Dataframe, n: int | None = None, t: int | None = None) -> pd.Dataframe:
        if n is not None:
            return data.tail(n)
        if t is not None:
            df['time'] = pd.to_datetime(df['time'])  # Ensure the 'time' column is a datetime object
            one_hour_ago = datetime.now() - timedelta(hours=t)
            return df[df['time'] > one_hour_ago]
        return data
    
  6. Evidently by default only supports structured data e.g. tabular data (so does nearly every other framework). Thus, the question then becomes how we can extend unstructured data such as images or text? The solution is to extract structured features from the data which we then can run the analysis on.

    1. For images the simple solution would be to flatten the images and consider each pixel a feature, however this does not work in practice because changes in the individual pixels does not really tell anything about the image. Instead, we should derive some feature such as:

      • Average brightness
      • Contrast of an image
      • Image sharpness
      • ...

      These are all numbers that can make up a feature vector for a given image. Try out doing this yourself, for example by extracting such features from MNIST and FashionMNIST datasets, and check if you can detect a drift between the two sets.

      Solution
      image_drift.py
      import numpy as np
      import pandas as pd
      from evidently.metrics import DataDriftTable
      from evidently.report import Report
      from torchvision import datasets, transforms
      
      transform = transforms.Compose([transforms.ToTensor()])
      
      mnist = datasets.MNIST(root="data", train=True, download=True, transform=transform)
      fashion_mnist = datasets.FashionMNIST(root="data", train=True, download=True, transform=transform)
      
      mnist_images = mnist.data.numpy()
      fashion_images = fashion_mnist.data.numpy()
      
      
      def extract_features(images):
          """Extract basic image features from a set of images."""
          features = []
          for img in images:
              avg_brightness = np.mean(img)
              contrast = np.std(img)
              sharpness = np.mean(np.abs(np.gradient(img)))
              features.append([avg_brightness, contrast, sharpness])
          return np.array(features)
      
      
      mnist_features = extract_features(mnist_images)
      fashion_features = extract_features(fashion_images)
      
      feature_columns = ["Average Brightness", "Contrast", "Sharpness"]
      
      mnist_df = np.column_stack((mnist_features, ["MNIST"] * mnist_features.shape[0]))
      fashion_df = np.column_stack((fashion_features, ["FashionMNIST"] * fashion_features.shape[0]))
      
      combined_features = np.vstack((mnist_df, fashion_df))
      
      feature_df = pd.DataFrame(combined_features, columns=feature_columns + ["Dataset"])
      feature_df[feature_columns] = feature_df[feature_columns].astype(float)
      
      reference_data = feature_df[feature_df["Dataset"] == "MNIST"].drop(columns=["Dataset"])
      current_data = feature_df[feature_df["Dataset"] == "FashionMNIST"].drop(columns=["Dataset"])
      
      report = Report(metrics=[DataDriftTable()])
      report.run(reference_data=reference_data, current_data=current_data)
      report.save_html("data_drift.html")
      
    2. (Optional) For text a common approach is to extra some higher level embedding such as the very classical GLOVE embedding. Try following this tutorial to understand how drift detection is done on text.

    3. Instead of manually specifying the features, let's take a deep learning based approach to getting features from unstructured data. To do this let's consider the CLIP model, which is state-of-the-art model for connecting text to images e.g. image captioning. For our purpose this is perfect because we can use the model to get abstract feature embeddings for both images and text. Implement a simple script that extracts features from an image and a text using CLIP. We recommend using the Huggingface implementation for doing this. What is the size of the feature vector?

      Solution

      Both img_features and text_features for the standard CLIP model are a (512,) abstract feature embedding. We cannot interpret these features directly, but they should be able to tell us something about our data distribution.

      clip_features.py
      import requests
      from PIL import Image
      from transformers import CLIPModel, CLIPProcessor
      
      model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
      processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
      
      url = "http://images.cocodataset.org/val2017/000000039769.jpg"
      image = Image.open(requests.get(url, stream=True, timeout=30).raw)
      
      # set either text=None or images=None when only the other is needed
      inputs = processor(text=["a photo of a cat", "a photo of a dog"], images=image, return_tensors="pt", padding=True)
      
      img_features = model.get_image_features(inputs["pixel_values"])
      text_features = model.get_text_features(inputs["input_ids"], inputs["attention_mask"])
      print(img_features.shape, text_features.shape)
      
    4. Run your CLIP script on two different datasets for example CIFAR10 and SVHN for images or IMDB movie review and Amazon review for text. Then run the data drift detection on the extracted features. What do you see? Does the data drift?

  7. (Optional) If we have multiple applications and want to run monitoring for each application we often want also the monitoring to be a deployed application (that only we can access). Implement a /monitoring endpoint that does all the reporting we just went through such that you have two endpoints:

    http://127.0.0.1:8000/predict/?sepal_length=1.0&sepal_width=1.0&petal_length=1.0&petal_width=1.0  # user endpoint
    http://127.0.0.1:8000/monitoring/  # monitoring endpoint
    

    Our monitoring endpoint should return an HTML page either showing an Evidently report or test suit. Try implementing this endpoint.

    Solution
    iris_fastapi_solution.py
    import pickle
    from collections.abc import Generator
    from datetime import datetime
    
    import anyio
    import pandas as pd
    from evidently.metric_preset import (
        DataDriftPreset,
        DataQualityPreset,
        TargetDriftPreset,
    )
    from evidently.report import Report
    from fastapi import BackgroundTasks, FastAPI
    from fastapi.responses import HTMLResponse
    from sklearn import datasets
    
    
    def lifespan(app: FastAPI) -> Generator[None]:
        """Load model and classes, and create database file."""
        global model, classes
        classes = ["Iris-Setosa", "Iris-Versicolour", "Iris-Virginica"]
        with open("model.pkl", "rb") as file:
            model = pickle.load(file)
    
        with open("prediction_database.csv", "w") as file:
            file.write("time, sepal_length, sepal_width, petal_length, petal_width, prediction\n")
    
        yield
    
        del model
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    def add_to_database(
        now: str,
        sepal_length: float,
        sepal_width: float,
        petal_length: float,
        petal_width: float,
        prediction: int,
    ) -> None:
        """Simple function to add prediction to database."""
        with open("prediction_database.csv", "a") as file:
            file.write(f"{now}, {sepal_length}, {sepal_width}, {petal_length}, {petal_width}, {prediction}\n")
    
    
    @app.post("/predict")
    async def iris_inference(
        sepal_length: float,
        sepal_width: float,
        petal_length: float,
        petal_width: float,
        background_tasks: BackgroundTasks,
    ):
        """Version 2 of the iris inference endpoint."""
        prediction = model.predict([[sepal_length, sepal_width, petal_length, petal_width]])
        prediction = prediction.item()
    
        now = str(datetime.now(tz=datetime.UTC))
        background_tasks.add_task(add_to_database, now, sepal_length, sepal_width, petal_length, petal_width, prediction)
        return {"prediction": classes[prediction], "prediction_int": prediction}
    
    
    @app.get("/monitoring", response_class=HTMLResponse)
    async def iris_monitoring():
        """Simple get request method that returns a monitoring report."""
        reference_data: pd.DataFrame = datasets.load_iris(as_frame=True).frame
        reference_data = reference_data.rename(
            columns={
                "sepal length (cm)": "sepal_length",
                "sepal width (cm)": "sepal_width",
                "petal length (cm)": "petal_length",
                "petal width (cm)": "petal_width",
                "target": "target",
            }
        )
        current_data = pd.read_csv("prediction_database.csv")
        current_data = current_data.drop(columns=["time"])
    
        data_drift_report = Report(metrics=[DataDriftPreset(), DataQualityPreset(), TargetDriftPreset()])
        data_drift_report.run(current_data=current_data, reference_data=reference_data)
        data_drift_report.save_html("monitoring.html")
    
        async with await anyio.open_file("monitoring.html", encoding="utf-8") as f:
            html_content = f.read()
    
        return HTMLResponse(content=html_content, status_code=200)
    

Data drift in the Cloud

In the next section we are going to look at how we can incorporate the data drifting in our cloud environment. In particular, we are going to be looking at how we can deploy a monitoring application that will run on a schedule and then report those statistics directly back into GCP for us to study.

❔ Exercises

In this set of exercises we are going to deploy a machine learning model for sentiment analysis trained on Google Play Store Reviews. The models task is to predict if a users review is positive, neutral or negative in sentiment. We are then going to deploy a monitoring service that will check if the distribution of the reviews have drifted over time. This may be useful if we are seeing a decrease in the number of positive reviews over time, which may indicate that our application is not performing as expected.

We have already created downloaded the training data, created a training script and trained a model for you. The training data and the trained model is available to download from the following Google Drive folder which can be quickly downloaded by running the following commands (which uses the gdown Python package):

pip install gdown
gdown --folder https://drive.google.com/drive/folders/19rZSGk4A4O7kDqPQiomgV0TiZkRpZ1Rs?usp=sharing

And the training script can be seen below. You are free to retrain the model yourself, but it takes about 30 mins to train using a GPU. Overall the model scores around 74% accuracy on a hold-out test set. We recommend that you scroll through the files to get an understanding of what is going on.

Training script for sentiment analysis model
sentiment_classifier.py
# All credits to
# https://www.kaggle.com/code/prakharrathi25/sentiment-analysis-using-bert/notebook
from collections import defaultdict

import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import BertModel, BertTokenizer, get_linear_schedule_with_warmup

MODEL_NAME = "bert-base-cased"
BATCH_SIZE = 16
MAX_LEN = 160
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = pd.read_csv("reviews.csv")


def to_sentiment(rating):
    """Convert rating to sentiment class."""
    rating = int(rating)
    if rating <= 2:
        return 0  # Negative
    if rating == 3:
        return 1  # Neutral
    return 2  # Positive


# Apply to the dataset
df["sentiment"] = df.score.apply(to_sentiment)
class_names = ["negative", "neutral", "positive"]

# Build a BERT based tokenizer
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)


class GPReviewDataset(Dataset):
    """Google Play Review Dataset class."""

    def __init__(self, reviews, targets, tokenizer, max_len):
        self.reviews = reviews
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        """Return the length of the dataset."""
        return len(self.reviews)

    def __getitem__(self, item):
        """Get a single review from the dataset and tokenize it."""
        review = str(self.reviews[item])
        target = self.targets[item]

        # Encoded format to be returned
        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding="max_length",
            return_attention_mask=True,
            return_tensors="pt",
            truncation=True,
        )
        return {
            "review_text": review,
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding["attention_mask"].flatten(),
            "targets": torch.tensor(target, dtype=torch.long),
        }


df_train, df_test = train_test_split(df, test_size=0.2)
df_val, df_test = train_test_split(df_test, test_size=0.5)


def create_data_loader(df, tokenizer, max_len, batch_size):
    """Create a data loader for the dataset."""
    ds = GPReviewDataset(
        reviews=df.content.to_numpy(), targets=df.sentiment.to_numpy(), tokenizer=tokenizer, max_len=max_len
    )
    return DataLoader(ds, batch_size=batch_size)


train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)


# Build the Sentiment Classifier class
class SentimentClassifier(nn.Module):
    """Sentiment Classifier class. Combines BERT model with a dropout and linear layer."""

    def __init__(self, n_classes, model_name=MODEL_NAME):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask):
        """Forward pass of the model."""
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        output = self.drop(output[1])
        return self.out(output)


model = SentimentClassifier(len(class_names))
model = model.to(device)

EPOCHS = 10

# Optimizer Adam
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Set the loss function
loss_fn = nn.CrossEntropyLoss().to(device)


def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, n_examples):
    """Train the model for one epoch e.g. one pass through the dataset."""
    model = model.train()
    losses = []
    correct_predictions = 0

    for d in tqdm(data_loader):
        input_ids = d["input_ids"].to(device)
        attention_mask = d["attention_mask"].to(device)
        targets = d["targets"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)

        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, targets)
        correct_predictions += torch.sum(preds == targets)
        losses.append(loss.item())

        # Backward prop
        loss.backward()

        # Gradient Descent
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

    return correct_predictions.double() / n_examples, np.mean(losses)


def eval_model(model, data_loader, loss_fn, device, n_examples):
    """Evaluate the model."""
    model = model.eval()

    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)

            # Get model outputs
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)

            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, targets)

            correct_predictions += torch.sum(preds == targets)
            losses.append(loss.item())

    return correct_predictions.double() / n_examples, np.mean(losses)


history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    # Show details
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    print("-" * 10)

    train_acc, train_loss = train_epoch(model, train_data_loader, loss_fn, optimizer, device, scheduler, len(df_train))

    print(f"Train loss {train_loss} accuracy {train_acc}")

    # Get model performance (accuracy and loss)
    val_acc, val_loss = eval_model(model, val_data_loader, loss_fn, device, len(df_val))

    print(f"Val   loss {val_loss} accuracy {val_acc}")
    history["train_acc"].append(train_acc)
    history["train_loss"].append(train_loss)
    history["val_acc"].append(val_acc)
    history["val_loss"].append(val_loss)

    # If we beat prev performance
    if val_acc > best_accuracy:
        torch.save(model.state_dict(), "bert_sentiment_model.pt")
        best_accuracy = val_acc

test_acc, _ = eval_model(model, test_data_loader, loss_fn, device, len(df_test))
print(f"Test Accuracy {test_acc.item()}")
  1. To begin with lets start by uploading the training data and model to a GCP bucket. Upload to a new GCP bucket called gcp_monitoring_exercise (or something similar). Upload the training data and the trained model to the bucket.

    Solution

    This can be done by running the following commands or manually uploading the files to the bucket using the GCP console.

    gsutil mb gs://gcp_monitoring_exercise
    gsutil cp reviews.csv gs://gcp_monitoring_exercise/reviews.csv
    gsutil cp bert_sentiment_model.pt gs://gcp_monitoring_exercise/bert_sentiment_model.pt
    
  2. Next we need to create a FastAPI application that takes a review as input and returns the predicted sentiment of the review. We provide a starting point for the application in the file below, that should be able to run as is.

    Starting point for sentiment analysis API
    sentiment_api_starter.py
    from contextlib import asynccontextmanager
    
    import torch
    import torch.nn as nn
    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    from transformers import BertModel, BertTokenizer
    
    # Define model and device configuration
    MODEL_NAME = "bert-base-cased"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    
    class ReviewInput(BaseModel):
        """Define input data structure for the endpoint."""
    
        review: str
    
    
    class PredictionOutput(BaseModel):
        """Define output data structure for the endpoint."""
    
        sentiment: str
    
    
    class SentimentClassifier(nn.Module):
        """Sentiment Classifier class. Combines BERT model with a dropout and linear layer."""
    
        def __init__(self, n_classes, model_name=MODEL_NAME):
            super().__init__()
            self.bert = BertModel.from_pretrained(model_name)
            self.drop = nn.Dropout(p=0.3)
            self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
    
        def forward(self, input_ids, attention_mask):
            """Forward pass of the model."""
            output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
            output = self.drop(output[1])
            return self.out(output)
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        """Load the model and tokenizer when the app starts and clean up when the app stops."""
        global model, tokenizer, class_names
        model = SentimentClassifier(n_classes=3)
        model.load_state_dict(torch.load("bert_sentiment_model.pt", map_location=device))
        model = model.to(device)
        model.eval()
        tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
        class_names = ["negative", "neutral", "positive"]
        print("Model and tokenizer loaded successfully")
    
        yield
    
        del model, tokenizer
    
    
    # Initialize FastAPI app
    app = FastAPI(lifespan=lifespan)
    
    
    # Prediction endpoint
    @app.post("/predict", response_model=PredictionOutput)
    async def predict_sentiment(review_input: ReviewInput):
        """Predict sentiment of the input text."""
        try:
            # Encode input text
            encoding = tokenizer.encode_plus(
                review_input.review,
                add_special_tokens=True,
                max_length=160,
                return_token_type_ids=False,
                padding="max_length",
                return_attention_mask=True,
                return_tensors="pt",
            )
    
            input_ids = encoding["input_ids"].to(device)
            attention_mask = encoding["attention_mask"].to(device)
    
            # Model prediction
            with torch.no_grad():
                outputs = model(input_ids, attention_mask)
                _, prediction = torch.max(outputs, dim=1)
                sentiment = class_names[prediction]
    
            return PredictionOutput(sentiment=sentiment)
    
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e)) from e
    
    1. Confirm that you can run the application by running the following command in the terminal

      uvicorn sentiment_api_starter:app --reload
      

      You need the model file saved in the same directory as the application to run the application. Write a small client.py script that calls the application with a review and prints the predicted sentiment.

      Solution
      import requests
      
      url = "http://localhost:8000/predict"
      review = "This is a great app, I love it!"
      response = requests.post(url, json={"review": review})
      print(response.json())
      
    2. Next, we need to extend the application in two ways. First instead of loading the model from our local computer, it should load from the bucket we just uploaded the model to. Secondly, we need to save the request data and the predicted label to the cloud. Normally this would best be suited in a database, but we are going to just save to the same bucket as the model. We just need to make sure each request is saved under a unique name (e.g. the time and date of the request). Implement both of these functionalities in the application. To interact with GCP buckets in Python you should install the google-cloud-storage package if you have not already done so.

      pip install google-cloud-storage
      
      Solution
      sentiment_api.py
      import datetime
      import json
      import os
      from contextlib import asynccontextmanager
      
      import torch
      import torch.nn as nn
      from fastapi import BackgroundTasks, FastAPI, HTTPException
      from google.cloud import storage
      from pydantic import BaseModel
      from transformers import BertModel, BertTokenizer
      
      # Define model and device configuration
      BUCKET_NAME = "gcp_monitoring_exercise"
      MODEL_NAME = "bert-base-cased"
      MODEL_FILE_NAME = "bert_sentiment_model.pt"
      device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      
      
      class ReviewInput(BaseModel):
          """Define input data structure for the endpoint."""
      
          review: str
      
      
      class PredictionOutput(BaseModel):
          """Define output data structure for the endpoint."""
      
          sentiment: str
      
      
      class SentimentClassifier(nn.Module):
          """Sentiment Classifier class. Combines BERT model with a dropout and linear layer."""
      
          def __init__(self, n_classes, model_name=MODEL_NAME):
              super().__init__()
              self.bert = BertModel.from_pretrained(model_name)
              self.drop = nn.Dropout(p=0.3)
              self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
      
          def forward(self, input_ids, attention_mask):
              """Forward pass of the model."""
              output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
              output = self.drop(output[1])
              return self.out(output)
      
      
      @asynccontextmanager
      async def lifespan(app: FastAPI):
          """Load the model and tokenizer when the app starts and clean up when the app stops."""
          global model, tokenizer, class_names
          if "bert_sentiment_model.pt" not in os.listdir():
              download_model_from_gcp()  # Download the model from GCP
          model = SentimentClassifier(n_classes=3)
          model.load_state_dict(torch.load("bert_sentiment_model.pt", map_location=device))
          model = model.to(device)
          model.eval()
          tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
          class_names = ["negative", "neutral", "positive"]
          print("Model and tokenizer loaded successfully")
      
          yield
      
          del model, tokenizer
      
      
      # Initialize FastAPI app
      app = FastAPI(lifespan=lifespan)
      
      
      def download_model_from_gcp():
          """Download the model from GCP bucket."""
          client = storage.Client()
          bucket = client.bucket(BUCKET_NAME)
          blob = bucket.blob(MODEL_FILE_NAME)
          blob.download_to_filename(MODEL_FILE_NAME)
          print(f"Model {MODEL_FILE_NAME} downloaded from GCP bucket {BUCKET_NAME}.")
      
      
      # Save prediction results to GCP
      def save_prediction_to_gcp(review: str, outputs: list[float], sentiment: str):
          """Save the prediction results to GCP bucket."""
          client = storage.Client()
          bucket = client.bucket(BUCKET_NAME)
          time = datetime.datetime.now(tz=datetime.UTC)
          # Prepare prediction data
          data = {
              "review": review,
              "sentiment": sentiment,
              "probability": outputs,
              "timestamp": datetime.datetime.now(tz=datetime.UTC).isoformat(),
          }
          blob = bucket.blob(f"prediction_{time}.json")
          blob.upload_from_string(json.dumps(data))
          print("Prediction saved to GCP bucket.")
      
      
      # Prediction endpoint
      @app.post("/predict", response_model=PredictionOutput)
      async def predict_sentiment(review_input: ReviewInput, background_tasks: BackgroundTasks):
          """Predict sentiment of the input text."""
          try:
              # Encode input text
              encoding = tokenizer.encode_plus(
                  review_input.review,
                  add_special_tokens=True,
                  max_length=160,
                  return_token_type_ids=False,
                  padding="max_length",
                  return_attention_mask=True,
                  return_tensors="pt",
              )
      
              input_ids = encoding["input_ids"].to(device)
              attention_mask = encoding["attention_mask"].to(device)
      
              # Model prediction
              with torch.no_grad():
                  outputs: torch.Tensor = model(input_ids, attention_mask)
                  _, prediction = torch.max(outputs, dim=1)
                  sentiment = class_names[prediction]
      
              background_tasks.add_task(save_prediction_to_gcp, review_input.review, outputs.softmax(-1).tolist(), sentiment)
      
              return PredictionOutput(sentiment=sentiment)
      
          except Exception as e:
              raise HTTPException(status_code=500, detail=str(e)) from e
      
    3. You should confirm that the application is working locally before moving on. You can do this by running the following command in the terminal

      uvicorn sentiment_api:app --reload
      

      And use the same client.py script as before to confirm that the application is working. You should also check that the data is saved to the bucket.

    4. Write a small Dockerfile that containerize the application

      Solution
      sentiment_api.dockerfilepy
      FROM python:3.11-slim
      
      WORKDIR /app
      
      RUN pip install fastapi torch transformers google-cloud-storage pydantic --no-cache-dir
      
      COPY sentiment_api.py .
      
      EXPOSE $PORT
      
      CMD exec uvicorn sentiment_api:app --port $PORT --host 0.0.0.0 --workers 1
      

      which can be built by running the following command

      docker build -f sentiment_api.dockerfile -t sentiment_api:latest .
      
    5. Deploy the container to cloud run and confirm that the application still runs as expected.

      Solution

      The following four commands should be able to deploy the application to GCP cloud run. Make sure to replace <location>, <project-id> and <repo-name> with the appropriate values.

      gcloud artifacts repositories create <repo-name> --repository-format=docker --location=<location>
      docker tag sentiment_api:latest <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_api:latest
      docker push <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_api:latest
      gcloud run deploy sentiment-api \
          --image <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_api:latest \
          --region <region> --allow-unauthenticated
      
    6. Make sure that the application still works by trying to send a couple of requests to the deployed application and make sure that the request/response data is correctly saved to the bucket.

      Solution

      To get the url of the deployed service you can run the following command

      gcloud run services describe sentiment-api --format 'value(status.url)'
      

      which can the be used in the client.py script to call the deployed service.

  3. We now have a working application that we are ready to monitor for data drift in real time. We therefore need to now write a FastAPI application that takes in the training data and the predicted data and run evidently to check if the data or the labels have drifted. Furthermore, we again provide a starting point for the application below.

    sentiment_monitoring_starter.py
    from pathlib import Path
    
    import anyio
    import nltk
    import pandas as pd
    from evidently.metric_preset import TargetDriftPreset, TextEvals
    from evidently.report import Report
    from fastapi import FastAPI
    from fastapi.responses import HTMLResponse
    
    nltk.download("words")
    nltk.download("wordnet")
    nltk.download("omw-1.4")
    
    
    def run_analysis(reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """Run the analysis and return the report."""
        text_overview_report = Report(metrics=[TextEvals(column_name="content"), TargetDriftPreset(columns=["sentiment"])])
        text_overview_report.run(reference_data=reference_data, current_data=current_data)
        text_overview_report.save("text_overview_report.html")
    
    
    def lifespan(app: FastAPI):
        """Load the data and class names before the application starts."""
        global training_data, class_names
        training_data = pd.read_csv("reviews.csv")
    
        def to_sentiment(rating):
            """Convert rating to sentiment class."""
            rating = int(rating)
            if rating <= 2:
                return 0  # Negative
            if rating == 3:
                return 1  # Neutral
            return 2  # Positive
    
        training_data["sentiment"] = training_data.score.apply(to_sentiment)
        class_names = ["negative", "neutral", "positive"]
    
        yield
    
        del training_data, class_names
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    def download_files(n: int = 5) -> None:
        """Download the N latest prediction files from the GCP bucket."""
    
    
    def load_latest_files(directory: Path, n: int) -> pd.DataFrame:
        """Fetch latest data from the database."""
        download_files(n=n)
    
    
    @app.get("/report")
    async def get_report(n: int = 5):
        """Generate and return the report."""
        prediction_data = load_latest_files(Path("."), n=n)
        run_analysis(training_data, prediction_data)
    
        async with await anyio.open_file("monitoring.html", encoding="utf-8") as f:
            html_content = f.read()
    
        return HTMLResponse(content=html_content, status_code=200)
    

    Look over the script and make sure you know what kind of features we are going to monitor?

    Solution

    The provided starting script makes use of two presets from evidently: TextOverviewPreset and TargetDriftPreset. The first preset extracts descriptive text statistics (like number of words, average word length etc.) and runs data drift detection on these and the second preset runs target drift detection on the predicted labels.

    1. The script misses one key function to work: fetch_latest_data(n: int) that should fetch the latest n predictions. Implement this function in the script.

      Solution
      sentiment_monitoring.py
      import json
      import os
      from pathlib import Path
      
      import anyio
      import nltk
      import pandas as pd
      from evidently.metric_preset import TargetDriftPreset, TextEvals
      from evidently.report import Report
      from fastapi import FastAPI
      from fastapi.responses import HTMLResponse
      from google.cloud import storage
      
      nltk.download("words")
      nltk.download("wordnet")
      nltk.download("omw-1.4")
      
      BUCKET_NAME = "gcp_monitoring_exercise"
      
      
      def to_sentiment(rating):
          """Convert rating to sentiment class."""
          rating = int(rating)
          if rating <= 2:
              return 0  # Negative
          if rating == 3:
              return 1  # Neutral
          return 2  # Positive
      
      
      def sentiment_to_numeric(sentiment: str) -> int:
          """Convert sentiment class to numeric."""
          if sentiment == "negative":
              return 0
          if sentiment == "neutral":
              return 1
          return 2
      
      
      def run_analysis(reference_data: pd.DataFrame, current_data: pd.DataFrame) -> None:
          """Run the analysis and return the report."""
          text_overview_report = Report(metrics=[TextEvals(column_name="content"), TargetDriftPreset(columns=["sentiment"])])
          text_overview_report.run(reference_data=reference_data, current_data=current_data)
          text_overview_report.save("text_overview_report.html")
      
      
      def lifespan(app: FastAPI):
          """Load the data and class names before the application starts."""
          global training_data, class_names
          training_data = pd.read_csv("reviews.csv")
          training_data["sentiment"] = training_data.score.apply(to_sentiment)
          training_data["target"] = training_data["sentiment"]  # evidently expects the target column to be named "target"
          class_names = ["negative", "neutral", "positive"]
      
          yield
      
          del training_data, class_names
      
      
      app = FastAPI(lifespan=lifespan)
      
      
      def load_latest_files(directory: Path, n: int) -> pd.DataFrame:
          """Load the N latest prediction files from the directory."""
          # Download the latest prediction files from the GCP bucket
          download_files(n=n)
      
          # Get all prediction files in the directory
          files = directory.glob("prediction_*.json")
      
          # Sort files based on when they where created
          files = sorted(files, key=os.path.getmtime)
      
          # Get the N latest files
          latest_files = files[-n:]
      
          # Load or process the files as needed
          reviews, sentiment = [], []
          for file in latest_files:
              with file.open() as f:
                  data = json.load(f)
                  reviews.append(data["review"])
                  sentiment.append(sentiment_to_numeric(data["sentiment"]))
          dataframe = pd.DataFrame({"content": reviews, "sentiment": sentiment})
          dataframe["target"] = dataframe["sentiment"]
          return dataframe
      
      
      def download_files(n: int = 5) -> None:
          """Download the N latest prediction files from the GCP bucket."""
          bucket = storage.Client().bucket(BUCKET_NAME)
          blobs = bucket.list_blobs(prefix="prediction_")
          blobs.sort(key=lambda x: x.updated, reverse=True)
          latest_blobs = blobs[:n]
      
          for blob in latest_blobs:
              blob.download_to_filename(blob.name)
      
      
      @app.get("/report", response_class=HTMLResponse)
      async def get_report(n: int = 5):
          """Generate and return the report."""
          prediction_data = load_latest_files(Path("."), n=n)
          run_analysis(training_data, prediction_data)
      
          async with await anyio.open_file("monitoring.html", encoding="utf-8") as f:
              html_content = f.read()
      
          return HTMLResponse(content=html_content, status_code=200)
      
    2. Test out the script locally. This can be done by downloading a couple of the request/response data from the bucket and running the script on this data.

    3. Write a Dockerfile that containerize the monitoring application

      Solution
      sentiment_monitoring.dockerfile
      FROM python:3.11-slim
      
      WORKDIR /app
      
      RUN pip install fastapi nltk evidently google-cloud-storage --no-cache-dir
      
      COPY sentiment_monitoring.py .
      
      EXPOSE $PORT
      
      CMD exec uvicorn sentiment_monitoring:app --port $PORT --host 0.0.0.0 --workers 1
      
    4. Deploy the monitoring application to cloud run and confirm that the application returns a monitoring report when asked for it.

      Solution
      docker tag sentiment_monitoring:latest \
          <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_monitoring:latest
      docker push <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_monitoring:latest
      gcloud run deploy sentiment-monitoring \
          --image <location>-docker.pkg.dev/<project-id>/<repo-name>/sentiment_monitoring:latest \
          --region <region> --allow-unauthenticated
      
  4. We are now finally, ready to test our services. Since we need to observe some long term behavior this part may take some time to run depending on how you have exactly configured your. Below we have implemented a client script that are meant to call our service.

    Client script for sentiment analysis model

    sentiment_client.py
    import argparse
    import random
    import time
    
    import requests
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        parser.add_argument("--url", type=str, default="http://localhost:8000/predict")
        parser.add_argument("--wait_time", type=int, default=5)
        parser.add_argument("--max_iterations", type=int, default=1000)
        args = parser.parse_args()
    
        reviews = [
            "This app is fantastic! I use it every day.",
            "I enjoy using this app, it's very helpful.",
            "It's a decent app, nothing extraordinary but useful.",
            "This app is okay, but it could be improved.",
            "I'm not very impressed with this app.",
            "This app is not meeting my expectations.",
        ]
    
        negative_phrases = [
            "It’s getting frustrating to use.",
            "There are so many bugs now.",
            "The app crashes often and I am really disappointed.",
            "I think I'm going to stop using this app soon.",
            "It has become completely unusable.",
        ]
    
        count = 0
        while count < args.max_iterations:
            review = random.choice(reviews)
            negativity_probability = min(count / args.max_iterations, 1.0)
    
            updated_review = review
            for phrase in negative_phrases:
                if random.random() < negativity_probability:
                    updated_review += " " + phrase
    
            response = requests.post(args.url, json={"review": updated_review}, timeout=10)
            print(f"Iteration {count}, Sent review: {updated_review}, Response: {response.json()}")
            time.sleep(args.wait_time)
            count += 1
    
    1. What does the client script do?

      Solution

      The client script will iteratively call our deployed sentiment analysis service every wait_time seconds. In each iteration it does:

      • Randomly samples a review for a list of positive, neutral and negative reviews
      • Randomly add negative phrases to the review. Each review is added if a randomly uniform number is lower than probability `negative_probability=min(count / args.max_iterations, 1.0), meaning that it becomes more and more likely that the negative phrases are added as the number of iterations increases.
      • Sends the review to the sentiment analysis service and saves the response to a file.
    2. Run the client script for 1000 iterations. What happens to the distribution of the reviews over time? Does the data drift?

That ends the module on detection of data drifting, data quality etc. We have a couple of final points to make before we end the module:

  • Monitoring of machine learning applications is an extremely hard discipline because it is not clear-cut when we should actually respond to feature/targets beginning to drift and when it is probably fine letting the system run as is. That comes down to the individual application what kind of rules that should be implemented.

  • The cloud setup we have developed is very simple and not meant for production. In a real-world scenario we would not have deployed our monitoring application an endpoint that generates a report, but rather have it return the tests results in a JSON format that can be ingested into more complex monitoring systems where we can show how drift scores develop over time. You will learn more about this in the next module.

  • The tools presented here are in no way complete and are especially limited in one way: they are only considering the marginal distribution of data. Every analysis that we're done have been on the distribution per feature (the marginal distribution), however as the image below show it is possible for data to have drifted to another distribution with the marginal being approximately the same.

    Image

    There are methods such as Maximum Mean Discrepancy (MMD) tests that are able to do testing on multivariate distributions, which you are free to dive into. The general recommendation is to just always consider multiple features when taking decisions. In this course we will just always recommend considering multiple features when doing decision regarding your deployed applications.

Finally, we want to stress that monitoring is a very active field of research and that there are many more tools and frameworks that can be used for monitoring.