Curators
Auto-populate datasets with filtered data
Use Curators to collect specified data at specified intervals:
- Ensure you're always training on the latest data
- Set up an automated job with a single SDK call, Gantry takes care of the rest
- Consume your data in flexible Datasets
The curation process leverages the following concepts:
- selection, which determines what data is collected from a production stream
- scheduling, which defines how often data is collected.
When data is assembled by a Curator job, it is saved in a Dataset with a version number associated from the job.
At a high level the Curator looks like this:
Concepts
Selecting data
Data selectors allow for encoding domain knowledge and insights about a problem to curate training sets.
Selectors take a list of filters
, and combine the filters with method
and limit
criteria. Methods order the results, and limits select the top n
of them. Limits allow you to encode a labeling budget directly into job definitions.
AFilter
is a boolean function that can be applied to a field. Curators will discard data points that do not evaluate to true by all filters. Filtering is an AND
operation.
The following example defines filters that identify loan applicants who buy too much Sweetgreen. These filters are then passed to a Selector instructed to prioritize data points with the lowest job satisfaction scores and to select a maximum of 10 data points per execution:
from gantry.automations.curators.selectors import BoundsFilter, EqualsFilter, OrderedSampler, Selector
buys_too_much_sweetgreen = [
EqualsFilter(field="inputs.college_degree", equals=True),
BoundsFilter(field="inputs.credit_score", upper=5_000),
BoundsFilter(field="inputs.job.income", lower=7_500),
]
dissatisfied_sweetgreen_addicts = Selector(
method=OrderedSampler(
field="inputs.job.satisfaction",
sort="ascending",
),
filters=buys_too_much_sweetgreen,
limit=10,
)
This Selector can be used to specify a Curator. If multiple Selectors are passed to a Curator, ANY datapoint that satisfies the Selector will be passed to the curation job. Selectors are an OR
operation.
from gantry.automations.curators import Curator
custom_curator = Curator(
name="most_dissatisfied_sweetgreen_addicts",
application_name=GANTRY_APP_NAME,
selectors=[dissatisfied_sweetgreen_addicts],
)
Scheduling the job
Scheduling specifies how often the job should run, creating a version should in the target dataset.
from gantry.automations.triggers import IntervalTrigger
from gantry.automations import Automation
import datetime
interval_trigger = IntervalTrigger(start_on = datetime.datetime(2022, 12, 31, 0, 0), interval = datetime.timedelta(days=1))
curator_automation = Automation(name="curator-automation", trigger=interval_trigger, action=custom_curator)
application.add_automation(curator_automation)
The interval
parameter, defined as a timedelta
object, dictates which time window each dataset version corresponds to. As soon as an interval ends, a curator runs its query and writes the resulting records to a new version in the target dataset.
Specifying a historical start_on
with a datetime
object will trigger Gantry to backfill every interval between the time specified and now. While the curator is enabled, it will then continue to create versions as intervals pass until it is disabled.
The interval
and start_on
parameters define a Trigger
. Together, a Curator
and a Trigger
define an automation.
Example: Bounded Ranges
Here we reuse an example from the Tutorial. In that example, we wanted to gather predictions served to users of newly created accounts, i.e. we wanted to "bound" the account age field. To do this we used the BoundedRangeCurator
, a convenient subclass of the Curator
. Gantry provides many such "stock curators" that encapsulate common selection criteria to help streamline the creation of curators. In the case of the BoundedRangeCurator
, this looks like:
from gantry.automations.curators import BoundedRangeCurator
from gantry.automations.triggers import IntervalTrigger
from gantry.automations import Automation
import datetime
application = gantry.get_application(GANTRY_APP_NAME)
new_accounts_curator = BoundedRangeCurator(
name=f"{GANTRY_APP_NAME}-account-age-curator",
application_name=GANTRY_APP_NAME,
limit=1000,
bound_field="inputs.account_age_days",
lower_bound=0,
upper_bound=5,
)
interval_trigger = IntervalTrigger(start_on = datetime.datetime(2022, 1, 31, 0, 0),
interval = datetime.timedelta(days=1))
curator_automation = Automation(name="curator-automation",
trigger=interval_trigger,
action=new_accounts_curator)
application.add_automation(curator_automation)
Let's review each parameter:
name
: names the curator, we conveniently extend our application nameapplication_name
: ties the curator to an applicationlimit
: let's assume we can afford to label 1000 datapoints per daybound_field
: we want newer accounts so we specify theaccount_age_days
field for boundinglower_bound
,upper_bound
: in the tutorial we identified 0 to 5 days as the window for users experiencing degraded model performancestart_on
: the data for the tutorial is from 2022, so we specified the start of the window for which we have datainterval
: we want this curator to collect data 1x per day
That's all that's required for Gantry to seamlessly transform your production data-streams into versioned datasets that can be used for labeling, retraining, evaluation, or any other downstream process.
We can get a dataset from a curator directly. Note that curators are jobs that take time to run, so if the dataset is pulled too soon after curator creation, there won't be any data. You can tell if there's data by the commit message.
dataset = new_accounts_curator.get_curated_dataset()
dataset.list_versions()
dataset.pull()
Updated 3 months ago