How to use a pipeline library to make your life easier
Recently I've been working with the team at 51Degrees on some geolocation things, and I realised that one of the tools we were using has a rather more general application than I at first imagined. So now I come, O reader, to tell you about it.
A fair amount of the things I build involve pulling data from multiple APIs. (I
suspect this is true of a fair amount of the things you build too, O reader.)
And what that involves is a bunch of juggling to handle querying those APIs,
collecting the results together, working out what to do if one API call fails
and the rest don't, adding caching layers, and so on. This is one of those
insidious programming tasks where it looks easy, because you can write a
prototype in five minutes, and then gradually over time you add robustness every
time a bug's discovered. Think of it like making an HTTP request. This is not
hard, in concept; you can write the code to do it in a couple of lines at most,
but everyone who has ever considered doing this very quickly realises that
you're a lot better off to use one of the convenient higher-level libraries that
already exist -- got
for Node,
requests
for Python, etc.
This is because no plan survives contact with the enemy, and no freshly-written
code survives contact with the real world with all its edge cases and grubby
inconsistencies. So if you choose to not use got or requests or similar, you
will over time end up doing a half-arsed poorly-tested reimplementation of them
inside your own code. I advise against this. Instead, use the libraries that a
dedicated open source community has provided for you (and then send them some
funding so they keep doing that.)
And this applies to making and collating multiple API requests, too. Looks easy at first: many small edge cases. So that's what the pipeline library is for.
Pipelinish reasons
In essence, you build a reusable function that does an API query for each API you want to call, and then the pipeline strings them all together and takes care of aggregating the results. It's not doing the API querying work itself; it's all the fiddly bookkeeping, plus giving you a few hooks to add conveniences such as caching later if you need it. The pipeline documentation explains in more detail, but the tl;dr is what you've just read. They call the reusable functions "flow elements" and the query parameters "evidence", and the whole thing is implemented in a bunch of different languages: JavaScript and Python, PHP, C#, Java.
An example is useful here. Let's say that, given a geographical location, we want to combine weather data from 7timer and city data from Teleport. This involves making two separate queries, one to each of the APIs. To do that with the pipeline, we have two steps: make a flow element for each of the 7timer API and the teleport API, and then add each to a pipeline and run it. Flow elements first.
These examples will be in JavaScript, but as noted the pipeline library is available in many different languages. Here we're roughly working through the Simple Custom Flow Element example from the docs.
A basic JavaScript example
Add the pipeline libraries to your JS project with npm install --save
fiftyone.pipeline.core
and npm install --save fiftyone.pipeline.engines
. We also
for this example want the got
HTTP library so we can make HTTP requests, so npm
install --save got
as well. A flow element is defined by the core, and so we
subclass it; we need to define a dataKey
, which is the key under which this
element's results will show up in the final result, and we need to define
a processInternal
method which actually does the HTTP request and deals with the
result. So a minimal flowElement
might look like this:
const FiftyOnePipelineCore = require("fiftyone.pipeline.core");
const got = require('got');
class Weather extends FiftyOnePipelineCore.FlowElement {
constructor () {
super(...arguments);
this.dataKey = 'weather';
}
async processInternal (flowData) {
// note that error checking is elided for this example
let lat = flowData.evidence.get('query.lat');
let lon = flowData.evidence.get('query.lon');
const response = await got('http://www.7timer.info/bin/api.pl', {
responseType: "json",
searchParams: {
lat: lat, lon: lon, ac: 0, unit: "metric",
output: "json", tzshift: 0, product: "civil"
}
});
const data = new FiftyOnePipelineCore.ElementDataDictionary({
flowElement: this,
contents: {short: response.body.dataseries[0].weather}
});
flowData.setElementData(data);
}
}
This element will fetch out query parameters lat
and lon
that are provided to
it, and then construct and fetch an API
URL for results. The results are then assembled into an ElementDataDictionary and set on
the flowData object, which is the thing that assembles all the results together for use.
To demonstrate, let's construct a pipeline with only this element in it, and run it:
const weather_element = new Weather();
const pipeline = new FiftyOnePipelineCore.PipelineBuilder()
.add(weather_element)
.build();
const flowData = pipeline.createFlowData();
flowData.evidence.add('query.lat', 52.5);
flowData.evidence.add('query.lon', -1.8);
flowData.process().then(function () {
console.log(flowData.weather.short); // prints something like "lightsnowday"
});
This is done in three steps: add elements to a new pipeline and build it; add
evidence to the pipeline via flowData;
then call flowData.process()
, which
resolves when the flowData is populated with results. (Here, we simply print
that result out; in real use you'd obviously do something more interesting with
it.)
The flowData
object is more than a simple container. Some flowElements in the
pipeline may not be populated, and a flowData
is designed to make this possible.
In practice, this is not used much in JavaScript, where it's possible to test at
runtime whether an object has a property; but it's very useful in more strict
and unforgiving languages such as C# or Java, in which a property may not be
present and accessing it will cause errors. For more about using hasValue
to
check for properties, it's worth looking at the documentation's reverse geocoding example,
or Which Three
Birdies which explains how this might be done in PHP in detail; we'll skip over it in this
example, because JavaScript doesn't need it as much.
It must flow
So far, so simple. We haven't really gained all that much by using the pipeline rather than making a request ourselves, of course, but now that the structure is in place, it's easy to add new things. One example here, which is important, is that data flows through the pipeline, to one element after another. This means that later elements can rely on previous elements having run.
The Teleport APIs allow fetching various data about cities -- quality of life indices, population, and the like. This can be done in multiple steps; first look up the nearest city to a geographical point, then look up basic city info for that place. These could be implemented as one Teleport flowElement, but for purposes of illustration on how one might string together multiple elements, let's do it with two; one to look up the nearest city and stash that result into the flowData, and then the next to retrieve that result from the flowData and use it to look up city info. Again, this is a simple example, but having the structure of the pipeline is useful here because implementing caching, for example, is much easier to do with the structure in place.
/* NearestCity needs to run first to geolocate the lat/long to a city */
class NearestCity extends FiftyOnePipelineCore.FlowElement {
constructor () {
super(...arguments);
this.dataKey = 'city';
this.evidenceKeyFilter = new FiftyOnePipelineCore
.BasicListEvidenceKeyFilter(['query.lat', 'query.lon']);
this.properties = {city: {type: 'string', description: "nearest city"}};
}
async processInternal (flowData) {
let lat = flowData.evidence.get('query.lat');
let lon = flowData.evidence.get('query.lon');
const response = await got(
`https://api.teleport.org/api/locations/${lat}%2C${lon}/`,
{responseType: "json"});
let cities = response.body._embedded["location:nearest-cities"];
cities.sort((a, b) => a.distance_km - b.distance_km);
const data = new FiftyOnePipelineCore.ElementDataDictionary({
flowElement: this,
contents: {city: cities[0]._links["location:nearest-city"]}
/* this contains an href element to fetch more data */
});
flowData.setElementData(data);
}
}
/* CityData uses the information from NearestCity */
class CityData extends FiftyOnePipelineCore.FlowElement {
constructor () {
super(...arguments);
this.dataKey = 'city';
this.evidenceKeyFilter = new FiftyOnePipelineCore
.BasicListEvidenceKeyFilter(['query.lat', 'query.lon']);
this.properties = {
population: {type: 'number', description: "number of people"}
};
}
async processInternal (flowData) {
try {
let city_nearest_url = flowData.city.city.href;
const response = await got(city_nearest_url, {responseType: "json"});
const data = new FiftyOnePipelineCore.ElementDataDictionary({
/* replace the previous flowData.city */
flowElement: this, contents: {population: response.body.population}
});
// Set this data on the flowElement
flowData.setElementData(data);
} catch (error) {
console.log(error);
throw(error);
}
}
}
const nc_element = new NearestCity();
const weather_element = new Weather();
const citydata_element = new CityData();
const pipeline = new FiftyOnePipelineCore.PipelineBuilder()
.addParallel([nc_element, weather_element]) // NearestCity runs first
.add(citydata_element) // add citydata afterwards so it can use data from NC
.build();
const flowData = pipeline.createFlowData();
flowData.evidence.add('query.lat', 52.5);
flowData.evidence.add('query.lon', -1.8);
flowData.process().then(function () {
console.log(flowData.city.population); // => 984333
console.log(flowData.weather.weather); // => lightsnowday
});
Each of the flowElements is a reusable components, so we can add Weather
to this
pipeline easily as well as our two new flowElements. Note that the order that
things are added to the pipeline is important. NearestCity and Weather have
nothing to do with one another, so we add them in parallel by passing a list of
parallel elements to flowData.addParallel
. CityData requires the information
that NearestCity retrieves, so we add it as a second stage in the pipeline, and
then everything works. Data flows from one stage in the pipeline to the next,
and can be augmented at each stage.
And for my next trick
That explains how to use the 51Degrees Pipeline library to aggregate API calls to cloud services. What I want to look at in the second half of this series is some more advanced usages: how to use a local ("on-premises") data source, how to add caching in a general way for all flowElements, and error handling and retries.