Logging: Send Data to Gantry

Logging is how you get data into Gantry. Once it's there, you can begin analyzing it! There are many ways to get data into Gantry; this page describes all of them except for Logging LLM Completion Data. It's expected that only one method below best fits your use case.

Logging overview

Gantry accumulates records corresponding to predictions (inputs and outputs) and feedback (the ground truth output). Feedback helps the quality of predictions. Predictions can also be enriched with tags and projections (derived values) to provide a clearer picture of model behavior.

The terms introduced above will be described in detail in the sections that follow. Explanations will be in the context of the following record from Gantry:

In this example describes a text generation model with a single input and output. The table below explains the each column:

ColumnDescription
record_keyA unique and stable identifier for this prediction. This key can be used to apply feedback at any point in time.
application_nameThe name of this application within Gantry. Roughly speaking, each application has a corresponding "infinite DataFrame" consisting of rows like this.
tags.envAn example of how to use tags to indicate this record was captured in production.
tags.user_typeAn example of enriching a prediction with data that might not be an input, but helps add context to how the model impacts users.
inputs.promptThe prompt provided by the user, the model input.
outputs.generationThe output produced by text generation.
feedback.thumbs_upAn example of feedback that is not "ground truth", but merely the opinion of the user. This type of feedback is well suited for assessing how well users are receiving the model's predictions.
projections.word_countAn example of using projections to "project" a higher dimensional input, raw text, into a scalar. This helps understand the model's behavior more systematically.

There are 3 main ways to log data to Gantry: via stream, via batch, and via data connector. The section below describes how to log the first few columns in the record displayed above.

Regardless of the logging type, Gantry needs to be initialized.

Note that Gantry is global
The Gantry module is initialized globally for per Python process. That means all logging calls in a process need to share an API key. Upon initialization, you can instruct Gantry not to send data in the background. This is only recommended if you're working in an async environment, as it might slow down your processes otherwise.

import gantry

gantry.init(
    api_key="YOUR_API_KEY",
)

📘

Logging Media Data

The process for logging image and audio data is different. Skip to that documentation here.

Logging via stream and batch are the two simplest ways to get started with logging in Gantry. Stream is the default upload type of application.log. A batch upload can be invoked by setting the as_batch parameter to True.

Logging via stream is a non-blocking background process. Logging via batch is a blocking synchronous process designed for larger uploads. Logging via batch allows you to track your ingestion job on the Jobs tab in the UI. They both support all of the same parameters.

Stream

application = gantry.get_application(GANTRY_APP_NAME)
# OR gantry.create_application(GANTRY_APP_NAME) if your app does not yet exist

inputs = {
  "prompt": "I read the news today oh boy",
}

outputs = {
  "generation": "About a lucky man who made the grade"
}

# Note: inputs & outputs can also be a list of pandas DataFrames

application.log(
  inputs=[inputs], 
  outputs=[outputs],
)

When you log data via stream, you can know the request was successful from the INFO logs:

2023-06-08 09:52:07,768 gantry.logger.stores INFO     Sending batch synchronously

Batch

application = gantry.get_application(GANTRY_APP_NAME)
# OR gantry.create_application(GANTRY_APP_NAME) if your app does not yet exist

inputs: pd.DataFrame = ...  # Contains the inputs to your model
outputs: pd.DataFrame = ... # Contains your model's predictions
  
# Note: inputs & outputs can also be a list of dictionaries

application.log(
  inputs=[inputs],
  outputs=[outputs],
  as_batch=True,
)

When you log data via batch, you can see it on the jobs tab. You also know the request was successful from the INFO logs:

2023-06-08 10:29:22,166 gantry.logger.client INFO     Initializing upload to Gantry
2023-06-08 10:29:23,710 gantry.logger.client INFO     Starting Gantry Ingestion
2023-06-08 10:29:23,987 gantry.logger.utils INFO     Track your batch at https://app.gantry.io/applications/distracted-driving/jobs
2023-06-08 10:29:23,989 gantry.logger.utils INFO     Look for batch id: 0405de00-fa7c-4d2c-a17f-eff746949366

Gantry supports logging records directly from your databases with data connectors. Currently the only supported data connectors are Snowflake and S3.

Using the S3 data connector

To log data from S3, bucket file structure must follow the Hive format: s3://<bucket-name>/<some-path-prefix-if-necessary>/year=2023/month={4 or 04}/day=24/hour=16/some_file.csv. Note that the date time here is the processing time, not the event time.

Register the secret with privileges that allow bucket read access:

  1. Create json file with credentials
{
    "s3_bucket_name": "MY_BUCKET",
    "aws_access_key_id": "xxx",
    "aws_secret_access_key": "xxx",
}
  1. Register file with Gantry
$ export GANTRY_API_KEY = "YOUR_API_KEY"

$ gantry-cli secret create \
    --name "MY_SECRET" \
    --secret-type="AWS" \
    --secret-file="./s3_credentials.json"
  1. Register a data connector:
gantry-cli data-connector create \
  --name="my-s3-connector" \
  --connection-type="S3" \
  --secret-name="MY_SECRET" \
  --description="S3 data connector for MY_APP prediction pipeline" \
  --options='{"s3_bucket_name":"MY_BUCKET","s3_filetype": "jsonl","s3_path_prefix": "MY_APP/prediction"}'
  1. Trigger log prediction events, this will create a prediction pipeline:
from gantry.logger.types import Schedule, ScheduleFrequency, ScheduleOptions
import gantry
from typing import List

gantry.init()

inputs: List[str] = ["prompt"]
outputs: List[str] = ["generation"]
timestamp: str = "updated_at"

gantry.log_from_data_connector(
  application="sweetgreen-sample-sameena",
  source_data_connector_name="my-s3-connector",
  timestamp=timestamp,
  inputs=inputs,
  outputs=outputs,
  row_tags=["version"],
  global_tags={"env": "development"},
  schedule=Schedule(
    frequency=ScheduleFrequency.EVERY_HOUR,
    options=ScheduleOptions(
        delay_time=180,
    )
  )
)

Once your data is logged, you should go into your application schema and double check that the data types have been interpreted correctly. This ensures you can perform proper analysis on your data.

Using the Snowflake data connector

Register the secret with privileges granted to the table or view of the source database:

$ export GANTRY_API_KEY = "YOUR_API_KEY"

$ gantry-cli secret create \
    --name "MY_SECRET" \
    --secret-type="SNOWFLAKE_CONN_STR" \
    --secret-file="./credentials.json"

Your secret may look like this:

{
    "server_name": "SERVE_NAME",
    "user_name": "PASSWORD",
    "password": "PASSWORD",
    "warehouse_name": "DEV" // Specific for Snowflake
}

Register the data connector:

$ gantry-cli data-connector create \
    --name "my-snowflake-connector" \
    --connection-type="SNOWFLAKE" \
    --database-name="MY_DB" \
    --secret-name="MY_SNOWFLAKE_SECRET" \
    --description="Data connector to log records from my snowflake database" \
    --options='{"schema_name": "MY_SCHEMA","table_name": "MY_TABLE"}'

Submit the logging request to Gantry via the SDK:

inputs: List[str] = ["prompt"]
outputs: List[str] = ["generation"]
timestamp: str = "updated_at"
  
 gantry.log_from_data_connector(
    application="my-awesome-app",
    source_data_connector_name="my-snowflake-connector",
    timestamp=timestamp,
    inputs=inputs,
    outputs=outputs,
)

Scheduling requests

The below example shows a request that will trigger every 8 hours from the indicated start date. The data that will be ingested will be filtered by the column name specified in watermark_key. delay_time defined in ScheduleOptions ensures that late-arriving data in the source table/view will be included up to the specified number of seconds.

from gantry.logger.types import Schedule, ScheduleFrequency, ScheduleType, ScheduleOptions

gantry.log_from_data_connector(
    application="my-awesome-app",
    source_data_connector_name="my-snowflake-connector",
    timestamp=timestamp,
    inputs=inputs,
    outputs=outputs,
    global_tags=tags,
    schedule=Schedule(
        start_on="2023-01-14T08:00:00.000000",
        frequency=ScheduleFrequency.EVERY_8_HOURS,
        type=ScheduleType.INCREMENTAL_APPEND,
        options=ScheduleOptions(watermark_key=timestamp, delay_time=300),
    )
)

Logging Media Data

Prerequisites

Image and audio data must be stored in a GCS or S3 bucket. That bucket needs to be registered with Gantry if it is private.

Logging images and audio to Gantry

import datetime
import gantry

inputs = [
      {
# Note: Gantry also supports presigned URLs or the normal "https://" url for public objects
          "s3_image": "s3://{bucket_name}/images/kitty.jpeg",
          "gcs_image": "gs://{bucket_name}/kitty.jpeg",
          "gcs_audio": "gs://{bucket_name}/audio_0.wav",
          "s3_audio": "s3://{bucket_name}/audio/audio_0.wav",
      },
      {
          "s3_image": "s3://{bucket_name}/images/kitty.jpeg",
          "gcs_image": "gs://{bucket_name}/kitty.jpeg",
          "gcs_audio": "gs://{bucket_name}/audio_0.wav",
          "s3_audio": "s3://{bucket_name}/audio/audio_0.wav",
      }]
outputs = [
      {
          "predict": "demo output",
      },
      {
          "predict": "demo output",
      }]
join_keys = [
      "4741fb17-5942-4dba-9057-6ddf43237e0a",
      "69bd0afc-115a-4394-8740-cf173fb7be3b",
  ]
timestamps = [
      datetime.datetime(2023,3, 7, 8, 5, 3, 125524),
      datetime.datetime(2023,3, 7, 8, 5, 4, 661582),
  ]
  
gantry.init(api_key=GANTRY_API_KEY)
application = gantry.get_application(GANTRY_APP_NAME)
application.log(
      inputs=inputs,
      outputs=outputs,
      join_keys=join_keys,
      timestamps=timestamps,
      as_batch=True,
  )

Debugging Logging Issues

  • If you're working in a heavily async environment and not seeing logging errors, but your data is not showing up correctly, try initializing Gantry with send_in_background set to False:
gantry.init(
    api_key="YOUR_API_KEY",
    send_in_background=False
)
  • Gantry logging does not throw errors so that it is consistent with other logging tools. To see status updates and errors, look at the python INFO and ERROR logs.