Curators

Auto-populate datasets with filtered data

Use Curators to collect specified data at specified intervals:

  1. Ensure you're always training on the latest data
  2. Set up an automated job with a single SDK call, Gantry takes care of the rest
  3. Consume your data in flexible Datasets

The curation process leverages the following concepts:

  • selection, which determines what data is collected from a production stream
  • scheduling, which defines how often data is collected.

When data is assembled by a Curator job, it is saved in a Dataset with a version number associated from the job.

At a high level the Curator looks like this:

Concepts

Selecting data

Data selectors allow for encoding domain knowledge and insights about a problem to curate training sets.

Selectors take a list of filters, and combine the filters with method and limit criteria. Methods order the results, and limits select the top n of them. Limits allow you to encode a labeling budget directly into job definitions.

AFilter is a boolean function that can be applied to a field. Curators will discard data points that do not evaluate to true by all filters. Filtering is an AND operation.

The following example defines filters that identify loan applicants who buy too much Sweetgreen. These filters are then passed to a Selector instructed to prioritize data points with the lowest job satisfaction scores and to select a maximum of 10 data points per execution:

from gantry.automations.curators.selectors import BoundsFilter, EqualsFilter, OrderedSampler, Selector

buys_too_much_sweetgreen = [
    EqualsFilter(field="inputs.college_degree", equals=True),
    BoundsFilter(field="inputs.credit_score", upper=5_000),
    BoundsFilter(field="inputs.job.income", lower=7_500),
]

dissatisfied_sweetgreen_addicts = Selector(
    method=OrderedSampler(
        field="inputs.job.satisfaction",
        sort="ascending",
    ),
    filters=buys_too_much_sweetgreen,
    limit=10,
)

This Selector can be used to specify a Curator. If multiple Selectors are passed to a Curator, ANY datapoint that satisfies the Selector will be passed to the curation job. Selectors are an OR operation.

from gantry.automations.curators import Curator

custom_curator = Curator(
    name="most_dissatisfied_sweetgreen_addicts",
    application_name=GANTRY_APP_NAME,
    selectors=[dissatisfied_sweetgreen_addicts],
)

Scheduling the job

Scheduling specifies how often the job should run, creating a version should in the target dataset.

from gantry.automations.triggers import IntervalTrigger
from gantry.automations import Automation
import datetime

interval_trigger = IntervalTrigger(start_on = datetime.datetime(2022, 12, 31, 0, 0), interval = datetime.timedelta(days=1))
curator_automation = Automation(name="curator-automation", trigger=interval_trigger, action=custom_curator)
application.add_automation(curator_automation)

The interval parameter, defined as a timedelta object, dictates which time window each dataset version corresponds to. As soon as an interval ends, a curator runs its query and writes the resulting records to a new version in the target dataset.

Specifying a historical start_on with a datetime object will trigger Gantry to backfill every interval between the time specified and now. While the curator is enabled, it will then continue to create versions as intervals pass until it is disabled.

The interval and start_on parameters define a Trigger. Together, a Curator and a Trigger define an automation.

Example: Bounded Ranges

Here we reuse an example from the Tutorial. In that example, we wanted to gather predictions served to users of newly created accounts, i.e. we wanted to "bound" the account age field. To do this we used the BoundedRangeCurator, a convenient subclass of the Curator. Gantry provides many such "stock curators" that encapsulate common selection criteria to help streamline the creation of curators. In the case of the BoundedRangeCurator, this looks like:

from gantry.automations.curators import BoundedRangeCurator
from gantry.automations.triggers import IntervalTrigger
from gantry.automations import Automation
import datetime

application = gantry.get_application(GANTRY_APP_NAME)

new_accounts_curator = BoundedRangeCurator(
    name=f"{GANTRY_APP_NAME}-account-age-curator",
    application_name=GANTRY_APP_NAME,
    limit=1000,
    bound_field="inputs.account_age_days",
    lower_bound=0,
    upper_bound=5,
)

interval_trigger = IntervalTrigger(start_on = datetime.datetime(2022, 1, 31, 0, 0), 
                                   interval = datetime.timedelta(days=1))


curator_automation = Automation(name="curator-automation", 
                                trigger=interval_trigger, 
                                action=new_accounts_curator)


application.add_automation(curator_automation)

Let's review each parameter:

  • name: names the curator, we conveniently extend our application name
  • application_name: ties the curator to an application
  • limit: let's assume we can afford to label 1000 datapoints per day
  • bound_field: we want newer accounts so we specify the account_age_days field for bounding
  • lower_bound, upper_bound: in the tutorial we identified 0 to 5 days as the window for users experiencing degraded model performance
  • start_on: the data for the tutorial is from 2022, so we specified the start of the window for which we have data
  • interval: we want this curator to collect data 1x per day

That's all that's required for Gantry to seamlessly transform your production data-streams into versioned datasets that can be used for labeling, retraining, evaluation, or any other downstream process.

We can get a dataset from a curator directly. Note that curators are jobs that take time to run, so if the dataset is pulled too soon after curator creation, there won't be any data. You can tell if there's data by the commit message.

dataset = new_accounts_curator.get_curated_dataset()
dataset.list_versions()
dataset.pull()

What’s Next