Gaffa is an event bus designed for situational awareness (SA) applications. The Reconometrics group at Morphism has been developing Gaffa technology for several years.
A consumer provides event patterns, and Gaffa sends matching events to the consumer either synchronously or asynchronously. Each event has a unique id that represents the approximate timestamp that the service received the event. The service always delivers events to a consumer is monotonically-increasing event id order, so a consumer can resume receiving an event stream based on the id of the last event processed by the consumer (or based on any timestamp). Alternately a consumer can request the n most recent events, optionally continuing that stream asynchronously with new matching events as they arrive.
Gaffa itself generates some meta-events. For example, Gaffa generates an event when the service first receives a event with certain characteristics. Gaffa also generates an event when it has not received an expected event.
Here’s an example of a simple consumer request for a stream of
geomagnetism
data from Barrow (BRW
) and Boulder
(BOU
). The service will
stream
matching events starting 24 hours ago and continuing into
real time data as it arrives.
{
"since": "24h",
"listen": true,
"patterns": [
{
"type": [
"geomag.usgs.gov/geomag"
],
"data": {
"metadata": {
"intermagnet": {
"imo": {
"iaga_code": [
"BRW", "BOU"
]
}
}
}
}
}
]
}
Our Gaffa services monitor many sources. When a monitor sees a new event, the event is forwarded to Gaffa instances, and Gaffa consumers receive those events immediately (if desired). Latency depends on specific service level agreements with a lower bound below 100 milliseconds from ingest to forwarding to a consumer.
To support robust situational awareness applications, Gaffa provides:
Pattern-based event filtering | Currently Gaffa supports AWS EventBridge-style pattern matching, and the service is designed to support other filtering in the future. |
Simple streaming to consumers | Currently Gaffa uses server-sent events for requests for streams of events. The service is designed to support other streaming protocols in the future. Gaffa also supports traditional synchronous HTTP requests. |
Streaming control | A consumer can receive a stream of events as they arrive. This stream can start with old events (either based on id, timestamp, or “nth oldest”) and transition transparently to live events. |
Event ids as timestamps | An event id represents the approximate timestamp that the service received the event. Events are always delivered in monotonically increasing id order for a given stream. |
Sophisticated query planner | The extensible query planner supports several query/streaming optimizations. |
Flexible deduplication | Event properties can indicate whether those properties should be considered when deduplicating in-coming events. |
Dead |
For some event sources, Gaffa monitors the arrival times of new messages. When a message has not arrived within a specified interval (perhaps on a per-attribute basis), Gaffa emits an event reporting the lack of messages with the relevant attributes within that interval. |
Novel event events | Certain events are published with keys that Gaffa uses to generate events for the first time that key has been seen. |
Per-event TTL | An event publisher can specify a time-to-live duration for an Gaffa event. After that interval, Gaffa deletes the event. |
Scalable to many of concurrent consumers | Gaffa can scale hardware horizontally (and/or vertically) to support more consumer traffic to meet a wide range of service level objectives. |
No consumer state | Gaffa does not store any consumer state. A consumer is responsible for remembering the id of the last event that the consumer processed successfully (if desired) |
Currently Gaffa can respond with:
A Gaffa instance typically receives incoming events from other applications that run internally. Our current production Gaffa instances do not receive arbitrary incoming events from the public.
Gaffa is a generic event bus; however current production is managed with using separate Gaffa instances for difference domains.
We are currently using Gaffa in these domains:
Each event has the following properties:
id |
required | A unique identifier that also serves as a timestamp indicating the approximate time the event was processed. |
payload |
required | An object that contains the actual data. |
type |
optional | a string specifying the type of the payload. |
types |
optional | An array of names (strings) of types for the payload that has multiple types. |
source |
optional | the name of the source of the event. This value corresponds
to name in a source specification (see below). |
subject |
optional | A string indicating the subject of the event. |
subjects |
optional | An array of strings indicating the subjects of the event. |
tags |
optional | An array of short strings representing topics of the event. |
time |
optional | An RFC3339 time (milliseconds Z) representing the time of the event in a sense that is event-specific. For an event that is an observation, this time is usually the time of that observation. For an event that is a prediction, this time is usually the predicted time. |
These attributes are hopefully compatible with and extend
the CloudEvent
specification. The extensions types
and subjects
support multiple values
for type
and subject
respectively.
The /sources
API returns a set of event sources. Each
event source can have the following properties:
name |
required | A string that gives a unique name for the source |
version |
optional | A string specifying version of the source |
types |
optional | Array listing possible event types for events emitted by the source |
url |
optional | A URL pointing to some authoritative documentation for the source. |
citation |
optional | A string providing a short description of the publisher |
capabilities |
optional | A specification (details TBD) about support for dead person’s switches, novel events, and other functionality supported by the source. |
In most cases, the events generated from a source follow the original source data as much as possible. Tabular records (as in CSV data) are converted to maps with column names as keys.
API | Description |
---|---|
/sources |
Get current event sources (JSON) |
/stream |
Receive a stream (SSE) of events |
/query |
Receive past events (JSON) |
/get |
Get a specific event by id |
/rssfeed |
RSS feed (XML) |
/explain |
Very experimental analysis of a consumer request |
/limits |
Get current consumer limits (JSON) |
/throttle |
Get current rate limiting (JSON) |
/match |
Utility to test event pattern matching |
/ping |
Ping pong with the service |
/schemas |
Get some JSON Schemas |
/version |
Version information (JSON) |
/sources
returns the set of active sources.
Parameters:
pretty=true |
optional | Pretty-print the response. |
Receive matching events via server-sent
events,
The SSE event type is (usually) message
.
Parameters:
q |
required | The consumer specification or an array of consumer specifications (see below). |
events |
optional | A string that’s used as the value of the events properties in the SSE response. If this value is index
then an event property for an event will have the form message-N , where N is the index of the consumer specification in the array of consumer specifications that produced this event. If events is none , then the output is just JSON without any SSE formatting. |
The consumer specification is an JSON object with these properties:
patterns |
required | An array of patterns (see below). |
since |
optional | String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to specify an interval in the past (like 1h , which would mean “an hour ago”). Alternately, zero represents the beginning of time. |
last |
optional | Integer n indicating that the consumer should receive events starting with the n most recent matching events. |
Note that the patterns
property of a consumer specification is an
array of patterns. If the consumer wants to specify last
separately
for different sets of patterns, the request can provide an array of
consumer specifications. In this case, the event ids that are
returned in the SSE response have a more complicated form to support
what is essentially multiplexed streams. Each event still carries its
unique id, and these ids are strictly ordered per consumer
specification but not globally.
If the request headers include Last-Event-ID
, then that value will
be used for since
. In the case of an array of consumer
specifications in the request, the id
of the event will be a complex
value that is not easily interpretable as an event id. This behavior
supports resumption of multiplexed streams. Use the id
property in
each event to obtain that event’s unique identifier.
The service enforces some rate limiting. See throttle
for current limits.
Here’s a simple example:
{"since":"1h",
"patterns":[{"sourceName":["geomag.usgs.gov/sources/data"],
"payload":{"values":{"metadata":{"station":["BOU"]}}}},
{"sourceName":["celestrak.org/decays"]}]}
This specification will receive events that match that pattern starting with events received approximately one hour ago and continuing until the SSE session times out (currently set for 15 minutes) or is closed.
Here’s a complete command-line example:
curl -G -d 'q={"last":3,"patterns":[{"source":["system"]}]}' 'localhost:8080/stream?events=none'
{"id":"2024-04-11T14:51:24.695382398","data":"pong","source":"system","time":"2024-04-11T14:51:24.695Z","type":"system/pong"}
{"id":"2024-04-11T14:51:24.706355285","data":{"count":1207,"intervalSecs":60},"source":"system","time":"2024-04-11T14:51:24.706Z","type":"system/msgCount"}
{"id":"2024-04-11T14:54:23.887497871","data":"pong","source":"system","time":"2024-04-11T14:54:23.887Z","type":"system/pong"}
{"id":"2024-04-11T14:58:58.993139067","type":"system/pong","time":"2024-04-11T14:58:58.992Z","data":"pong","source":"system"}
{"id":"2024-04-11T14:59:28.992956868","type":"system/pong","time":"2024-04-11T14:59:28.992Z","data":"pong","source":"system"}
{"id":"2024-04-11T14:59:28.995401739","type":"system/msgCount","time":"2024-04-11T14:59:28.994Z","data":{"count":3391,"intervalSecs":60},"source":"system"}
{"id":"2024-04-11T14:59:58.992991076","type":"system/pong","time":"2024-04-11T14:59:58.992Z","data":"pong","source":"system"}
This example uses events=none
to produce plain line-delimited JSON.
Returns an array of matching events as JSON.
Parameters:
q |
required if not in body | The consumer specification or array of specifications. |
pretty=true |
optional | Pretty-print the response. |
The consumer specification(s) can also be provided in the body of a
POST request (without the q=
).
The consumer specification is an JSON object with these properties:
patterns |
required | An array of patterns (see below). |
since |
optional | String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to
specify an interval in the past (like 1h , which would mean “an hour ago”). |
limit |
optional | Integer n indicating that the consumer should receive no more than n events. |
last |
optional | Integer n indicating that the consumer should receive events starting with the n most recent matching events. |
Note that the patterns
property of a consumer specification is an
array of patterns. If the consumer wants to specify limit
and/or
last
separately for different sets of patterns, the request can
provide an array of consumer specifications. In this case, the event
ids that are returned in the SSE response have a more complicated form
to support what is essentially multiplexed streams. Each event still
carries its unique id, and these ids are strictly ordered per consumer
specification but not globally.
Get an event by id.
Parameters:
id |
required | The id of the desired event. |
pretty=true |
optional | Pretty-print the response. |
Response is JSON representing the requested event if it exists.
/rssfeed
Get a query result as an RSS feed.
Parameters:
title |
optional | The title of the feed. |
description |
optional | The description of the feed. |
pattern |
required | The Gaffa event pattern for the desired events. |
titlegen |
optional | A jq-compatible program that should return a string. The event is provided as input to this program, and the output becomes the title for the item. The implementation is currently based on gojq . |
descgen |
optional | A jq-compatible program that should return a string. The event is provided as input to this program, and the output becomes the description for the item. The implementation is currently based on gojq . |
The response is currently limited to 10 events.
Example:
curl -G
--data-urlencode 'pattern={"type":["system/msgCount"]}' \
--data-urlencode 'title=Example RSS feed' \
--data-urlencode 'description=Just some system messages.' \
--data-urlencode 'titlegen="\(.count) Gaffa messages ingested"' \
--data-urlencode 'descgen="\(.count) Gaffa messages ingested at \(.time)."' \
https://gaffa.reconometrics.com/rssfeed
Returns some query cost and planning data about consumer request.
Parameters:
q |
required in not in body | The consumer specification or array of consumer specifications. |
pretty=true |
optional | Pretty-print the response. |
The consumer specification(s) can also be provided in the body of a
POST request (without the q=
).
patterns |
required | An array of patterns (see below). | |
since |
optional | String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to
specify an interval in the past (like 1h , which would mean “an hour ago”). | |
limit |
optional | Integer n indicating that the consumer should receive no more than n events. | |
last |
optional | Integer n indicating that the consumer should receive events starting with the n most recent matching events. | |
stream |
optional | Boolean indicating whether the consumer will stream events or not. |
/ping
Pingpong with the service. This test is shallow and only exercises minimal functionality.
/limits
Get current service limits.
/throttle
Get current rate limits.
/match
A utility API to test event pattern matching.
Returns true
or false
to indicate if the
given event
matches the given pattern
.
pattern |
required | An event pattern |
event |
required | An event (JSON) |
curl -G \
--data-urlencode 'pattern={"likes":["tacos"]}' \
--data-urlencode 'event={"likes":"tacos"}' \
gaffa.reconometrics.com/match
Get JSON schemas related to the service.
This pattern matching is based on AWS EventBridge patterns with an implementation based on Quamina.
A pattern in an object with leaves that are arrays. Each leaf specifies a constraint for an atomic value in an event, which is an arbitrary object. A pattern leaf (an array) represents either a set of literal values or a constraint. An event matches a pattern if the properties of the event match the corresponding properties of the pattern. An array in an event matches a corresponding pattern if any element of the array matches the pattern.
In the context of pattern matching, and “event” is just an arbitrary JSON objects.
Examples:
The event {"likes":"tacos","when":"daily"}
matches the pattern {"likes":["tacos","queso"]}
.
{"likes":["tacos","chips"],"when":"daily"}
matches {"likes":["chips","queso"]}
.
{"x":42,"y":143"}
matches {"y":[{"numeric":[">",100]}]}
.
{"prepare":{"what":"tacos","count":3}
matches {"prepare":{"what":["tacos","queso"]}}
.
{"aux":{"conj":{"dist":[{"exists":true"]}}}}
matches {"aux":{"conj":{"dist":1.23}}}
.
A pattern is also a JSON object. Each leaf value is called a value constraint, and it specifies the requirement for the event’s value at the same path (if any). constraint on the event’s value at the value’s path.
A value constraint is an array (with zero or more elements).
A literal value constraint is a value constraint that is an array that contains only atomic literal values. Examples:
["chips", "queso", "tacos"]
[true, 1e2, 42, "guacamole"]
In the current implementation, an atomic value is treated as a singleton array. This behavior departs from both EventBridge and Quamina, but it can be convenient.
For example:
{"likes":"tacos"}
is equivalent to
{"likes":["tacos"]}
An extended constraint is a value constraint (an array) that contains a single object. The object typically contains a single property, which serves as name for the type of the constraint. The value for that property specifies the constraint in a manner that’s specific to the constraint type.
For example, the following value constraint is a numeric
extended
constraint.
[{"numeric":["<",42]}]
A pattern is an object such that the object’s properties are either object patterns or value constraints. See below for the supported extended constraints.
An event property name that starts with #
is not
available for pattern matching, and so a pattern with a property name
that starts with #
is not allowed.
An atomic value matches a literal value constraint if the value is present in the constraint’s array.
A an array of atomic values matches a literal value constraint if any element of the array is present in the literal value constraint’s array.
An atomic value matches an extended constraint if the value satisfies the specific requirements of that extended constraint.
An object matches a pattern if all properties in the event pattern
are present in the event. (Note {"exists":false}
is not currently
supported.)
Here’s the current complete list of supported non-literal leaf predicates:
prefix |
per Quamina |
exists |
per Quamina except that only "exists":true is supported |
anything-but |
per Quamina |
shellstyle |
per Quamina |
numeric |
Similar to what EventBridge provides, with these additions:
|
string |
Similar to what EventBridge provides for "numeric" except for strings, with this addition:
|
equals-insentitive |
String equality without regard for case |
Gaffa supports dead person’s switches that publish events when an expected event doesn’t arrive after a specified interval. An “event didn’t arrive” event has the following structure:
type |
system/dms |
source |
system |
event |
the event that went missing |
last |
The timestamp the event was last seen |
elapsedSecs |
The number of seconds elapsed since the previous event arrived |
Each event published to Gaffa can be annotated with data to support these switches. We’re in the process of enabling many of the current event sources to provide this additional information when appropriate.
Certain events are published with keys that are used to generate events the first time that key has been seen. When Gaffa receives the first event with this key, Gaffa generates an event with following form:
{"source":"system",
"type":"system/novelty",
"key":{},
"event":{}}
The key
value is the novelty key above, and the event
value is the
event that triggered this novelty event.
The following examples use Space Domain Events (SDE) service, which might be running here.
Get a stream of all events from the system
source. In
each case, we use events=none
to get just JSON lines
instead of server-sent events. We are also doing POSTS instead of
GETs. To do a GET, pass the body in the POSTs as the value of the
query parameter q
.
cat<<EOF | curl -d @- https://gaffa.reconometrics.com/stream?events=none
{
"last": 3,
"patterns": [
{
"source": [
"system"
]
}
]
}
EOF
{"id":"2024-04-12T17:32:49.605963080","data":"pong","source":"system","time":"2024-04-12T17:32:49.605Z","type":"system/pong"}
{"id":"2024-04-12T17:33:19.606209609","data":"pong","source":"system","time":"2024-04-12T17:33:19.606Z","type":"system/pong"}
{"id":"2024-04-12T17:33:19.612978404","data":{"count":0,"intervalSecs":60},"source":"system","time":"2024-04-12T17:33:19.612Z","type":"system/msgCount"}