Reconometrics
Gaffa event bus manual

Introduction

Gaffa is an event bus designed for situational awareness (SA) applications. The Reconometrics group at Morphism has been developing Gaffa technology for several years.

A consumer provides event patterns, and Gaffa sends matching events to the consumer either synchronously or asynchronously. Each event has a unique id that represents the approximate timestamp that the service received the event. The service always delivers events to a consumer is monotonically-increasing event id order, so a consumer can resume receiving an event stream based on the id of the last event processed by the consumer (or based on any timestamp). Alternately a consumer can request the n most recent events, optionally continuing that stream asynchronously with new matching events as they arrive.

Gaffa itself generates some meta-events. For example, Gaffa generates an event when the service first receives a event with certain characteristics. Gaffa also generates an event when it has not received an expected event.

Here’s an example of a simple consumer request for a stream of geomagnetism data from Barrow (BRW) and Boulder (BOU). The service will stream matching events starting 24 hours ago and continuing into real time data as it arrives.

{
    "since": "24h",
    "listen": true,
    "patterns": [
        {
            "type": [
                "geomag.usgs.gov/geomag"
            ],
            "data": {
                "metadata": {
                    "intermagnet": {
                        "imo": {
                            "iaga_code": [
                                "BRW", "BOU"
                            ]
                        }
                    }
                }
            }
        }
    ]
}

Our Gaffa services monitor many sources. When a monitor sees a new event, the event is forwarded to Gaffa instances, and Gaffa consumers receive those events immediately (if desired). Latency depends on specific service level agreements with a lower bound below 100 milliseconds from ingest to forwarding to a consumer.

Functional summary

To support robust situational awareness applications, Gaffa provides:

Pattern-based event filtering Currently Gaffa supports AWS EventBridge-style pattern matching, and the service is designed to support other filtering in the future.
Simple streaming to consumers Currently Gaffa uses server-sent events for requests for streams of events. The service is designed to support other streaming protocols in the future. Gaffa also supports traditional synchronous HTTP requests.
Streaming control A consumer can receive a stream of events as they arrive. This stream can start with old events (either based on id, timestamp, or “nth oldest”) and transition transparently to live events.
Event ids as timestamps An event id represents the approximate timestamp that the service received the event. Events are always delivered in monotonically increasing id order for a given stream.
Sophisticated query planner The extensible query planner supports several query/streaming optimizations.
Flexible deduplication Event properties can indicate whether those properties should be considered when deduplicating in-coming events.
Dead Man’s Person’s Switches For some event sources, Gaffa monitors the arrival times of new messages. When a message has not arrived within a specified interval (perhaps on a per-attribute basis), Gaffa emits an event reporting the lack of messages with the relevant attributes within that interval.
Novel event events Certain events are published with keys that Gaffa uses to generate events for the first time that key has been seen.
Per-event TTL An event publisher can specify a time-to-live duration for an Gaffa event. After that interval, Gaffa deletes the event.
Scalable to many of concurrent consumers Gaffa can scale hardware horizontally (and/or vertically) to support more consumer traffic to meet a wide range of service level objectives.
No consumer state Gaffa does not store any consumer state. A consumer is responsible for remembering the id of the last event that the consumer processed successfully (if desired)

Currently Gaffa can respond with:

A Gaffa instance typically receives incoming events from other applications that run internally. Our current production Gaffa instances do not receive arbitrary incoming events from the public.

Domains

Gaffa is a generic event bus; however current production is managed with using separate Gaffa instances for difference domains.

We are currently using Gaffa in these domains:

Events

Each event has the following properties:

id required A unique identifier that also serves as a timestamp indicating the approximate time the event was processed.
payload required An object that contains the actual data.
type optional a string specifying the type of the payload.
types optional An array of names (strings) of types for the payload that has multiple types.
source optional the name of the source of the event. This value corresponds to name in a source specification (see below).
subject optional A string indicating the subject of the event.
subjects optional An array of strings indicating the subjects of the event.
tags optional An array of short strings representing topics of the event.
time optional An RFC3339 time (milliseconds Z) representing the time of the event in a sense that is event-specific. For an event that is an observation, this time is usually the time of that observation. For an event that is a prediction, this time is usually the predicted time.

These attributes are hopefully compatible with and extend the CloudEvent specification. The extensions types and subjects support multiple values for type and subject respectively.

Event sources

The /sources API returns a set of event sources. Each event source can have the following properties:

name required A string that gives a unique name for the source
version optional A string specifying version of the source
types optional Array listing possible event types for events emitted by the source
url optional A URL pointing to some authoritative documentation for the source.
citation optional A string providing a short description of the publisher
capabilities optional A specification (details TBD) about support for dead person’s switches, novel events, and other functionality supported by the source.

In most cases, the events generated from a source follow the original source data as much as possible. Tabular records (as in CSV data) are converted to maps with column names as keys.

APIs

Summary

API Description
/sources Get current event sources (JSON)
/stream Receive a stream (SSE) of events
/query Receive past events (JSON)
/get Get a specific event by id
/rssfeed RSS feed (XML)
/explain Very experimental analysis of a consumer request
/limits Get current consumer limits (JSON)
/throttle Get current rate limiting (JSON)
/match Utility to test event pattern matching
/ping Ping pong with the service
/schemas Get some JSON Schemas
/version Version information (JSON)

API /sources

/sources returns the set of active sources.

Parameters:

pretty=true optional Pretty-print the response.

API /stream

Receive matching events via server-sent events, The SSE event type is (usually) message.

Parameters:

q required The consumer specification or an array of consumer specifications (see below).
events optional A string that’s used as the value of the events properties in the SSE response. If this value is index then an event property for an event will have the form message-N, where N is the index of the consumer specification in the array of consumer specifications that produced this event. If events is none, then the output is just JSON without any SSE formatting.

The consumer specification is an JSON object with these properties:

patterns required An array of patterns (see below).
since optional String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to specify an interval in the past (like 1h, which would mean “an hour ago”). Alternately, zero represents the beginning of time.
last optional Integer n indicating that the consumer should receive events starting with the n most recent matching events.

Note that the patterns property of a consumer specification is an array of patterns. If the consumer wants to specify last separately for different sets of patterns, the request can provide an array of consumer specifications. In this case, the event ids that are returned in the SSE response have a more complicated form to support what is essentially multiplexed streams. Each event still carries its unique id, and these ids are strictly ordered per consumer specification but not globally.

If the request headers include Last-Event-ID, then that value will be used for since. In the case of an array of consumer specifications in the request, the id of the event will be a complex value that is not easily interpretable as an event id. This behavior supports resumption of multiplexed streams. Use the id property in each event to obtain that event’s unique identifier.

The service enforces some rate limiting. See throttle for current limits.

Here’s a simple example:

{"since":"1h",
 "patterns":[{"sourceName":["geomag.usgs.gov/sources/data"],
              "payload":{"values":{"metadata":{"station":["BOU"]}}}},
             {"sourceName":["celestrak.org/decays"]}]}

This specification will receive events that match that pattern starting with events received approximately one hour ago and continuing until the SSE session times out (currently set for 15 minutes) or is closed.

Here’s a complete command-line example:

curl -G -d 'q={"last":3,"patterns":[{"source":["system"]}]}' 'localhost:8080/stream?events=none'
{"id":"2024-04-11T14:51:24.695382398","data":"pong","source":"system","time":"2024-04-11T14:51:24.695Z","type":"system/pong"}
{"id":"2024-04-11T14:51:24.706355285","data":{"count":1207,"intervalSecs":60},"source":"system","time":"2024-04-11T14:51:24.706Z","type":"system/msgCount"}
{"id":"2024-04-11T14:54:23.887497871","data":"pong","source":"system","time":"2024-04-11T14:54:23.887Z","type":"system/pong"}
{"id":"2024-04-11T14:58:58.993139067","type":"system/pong","time":"2024-04-11T14:58:58.992Z","data":"pong","source":"system"}
{"id":"2024-04-11T14:59:28.992956868","type":"system/pong","time":"2024-04-11T14:59:28.992Z","data":"pong","source":"system"}
{"id":"2024-04-11T14:59:28.995401739","type":"system/msgCount","time":"2024-04-11T14:59:28.994Z","data":{"count":3391,"intervalSecs":60},"source":"system"}
{"id":"2024-04-11T14:59:58.992991076","type":"system/pong","time":"2024-04-11T14:59:58.992Z","data":"pong","source":"system"}

This example uses events=none to produce plain line-delimited JSON.

API /query

Returns an array of matching events as JSON.

Parameters:

q required if not in body The consumer specification or array of specifications.
pretty=true optional Pretty-print the response.

The consumer specification(s) can also be provided in the body of a POST request (without the q=).

The consumer specification is an JSON object with these properties:

patterns required An array of patterns (see below).
since optional String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to specify an interval in the past (like 1h, which would mean “an hour ago”).
limit optional Integer n indicating that the consumer should receive no more than n events.
last optional Integer n indicating that the consumer should receive events starting with the n most recent matching events.

Note that the patterns property of a consumer specification is an array of patterns. If the consumer wants to specify limit and/or last separately for different sets of patterns, the request can provide an array of consumer specifications. In this case, the event ids that are returned in the SSE response have a more complicated form to support what is essentially multiplexed streams. Each event still carries its unique id, and these ids are strictly ordered per consumer specification but not globally.

API /get

Get an event by id.

Parameters:

id required The id of the desired event.
pretty=true optional Pretty-print the response.

Response is JSON representing the requested event if it exists.

API /rssfeed

Get a query result as an RSS feed.

Parameters:

title optional The title of the feed.
description optional The description of the feed.
pattern required The Gaffa event pattern for the desired events.
titlegen optional A jq-compatible program that should return a string. The event is provided as input to this program, and the output becomes the title for the item. The implementation is currently based on gojq.
descgen optional A jq-compatible program that should return a string. The event is provided as input to this program, and the output becomes the description for the item. The implementation is currently based on gojq.

The response is currently limited to 10 events.

Example:

curl -G 
     --data-urlencode 'pattern={"type":["system/msgCount"]}' \
     --data-urlencode 'title=Example RSS feed' \
     --data-urlencode 'description=Just some system messages.' \
     --data-urlencode 'titlegen="\(.count) Gaffa messages ingested"' \
     --data-urlencode 'descgen="\(.count) Gaffa messages ingested at \(.time)."' \
      https://gaffa.reconometrics.com/rssfeed

API /explain

Returns some query cost and planning data about consumer request.

Parameters:

q required in not in body The consumer specification or array of consumer specifications.
pretty=true optional Pretty-print the response.

The consumer specification(s) can also be provided in the body of a POST request (without the q=).

patterns required An array of patterns (see below).
since optional String timestamp (like YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ) or an event id. The consumer will receive events starting with the first event with an id greater than this value. Alternately you can use Go’s Duration syntax to specify an interval in the past (like 1h, which would mean “an hour ago”).
limit optional Integer n indicating that the consumer should receive no more than n events.
last optional Integer n indicating that the consumer should receive events starting with the n most recent matching events.
stream optional Boolean indicating whether the consumer will stream events or not.

API /ping

Pingpong with the service. This test is shallow and only exercises minimal functionality.

API /limits

Get current service limits.

API /throttle

Get current rate limits.

API /match

A utility API to test event pattern matching. Returns true or false to indicate if the given event matches the given pattern.

pattern required An event pattern
event required An event (JSON)
Example:
curl -G \
  --data-urlencode 'pattern={"likes":["tacos"]}'  \
  --data-urlencode 'event={"likes":"tacos"}' \
  gaffa.reconometrics.com/match

API /schemas

Get JSON schemas related to the service.

Pattern matching

Introduction

This pattern matching is based on AWS EventBridge patterns with an implementation based on Quamina.

A pattern in an object with leaves that are arrays. Each leaf specifies a constraint for an atomic value in an event, which is an arbitrary object. A pattern leaf (an array) represents either a set of literal values or a constraint. An event matches a pattern if the properties of the event match the corresponding properties of the pattern. An array in an event matches a corresponding pattern if any element of the array matches the pattern.

In the context of pattern matching, and “event” is just an arbitrary JSON objects.

Examples:

  1. The event {"likes":"tacos","when":"daily"} matches the pattern {"likes":["tacos","queso"]}.

  2. {"likes":["tacos","chips"],"when":"daily"} matches {"likes":["chips","queso"]}.

  3. {"x":42,"y":143"} matches {"y":[{"numeric":[">",100]}]}.

  4. {"prepare":{"what":"tacos","count":3} matches {"prepare":{"what":["tacos","queso"]}}.

  5. {"aux":{"conj":{"dist":[{"exists":true"]}}}} matches {"aux":{"conj":{"dist":1.23}}}.

Patterns

A pattern is also a JSON object. Each leaf value is called a value constraint, and it specifies the requirement for the event’s value at the same path (if any). constraint on the event’s value at the value’s path.

A value constraint is an array (with zero or more elements).

A literal value constraint is a value constraint that is an array that contains only atomic literal values. Examples:

["chips", "queso", "tacos"]
[true, 1e2, 42, "guacamole"]

In the current implementation, an atomic value is treated as a singleton array. This behavior departs from both EventBridge and Quamina, but it can be convenient.

For example:

{"likes":"tacos"}

is equivalent to

{"likes":["tacos"]}

An extended constraint is a value constraint (an array) that contains a single object. The object typically contains a single property, which serves as name for the type of the constraint. The value for that property specifies the constraint in a manner that’s specific to the constraint type.

For example, the following value constraint is a numeric extended constraint.

[{"numeric":["<",42]}]

A pattern is an object such that the object’s properties are either object patterns or value constraints. See below for the supported extended constraints.

An event property name that starts with # is not available for pattern matching, and so a pattern with a property name that starts with # is not allowed.

Matching

An atomic value matches a literal value constraint if the value is present in the constraint’s array.

A an array of atomic values matches a literal value constraint if any element of the array is present in the literal value constraint’s array.

An atomic value matches an extended constraint if the value satisfies the specific requirements of that extended constraint.

An object matches a pattern if all properties in the event pattern are present in the event. (Note {"exists":false} is not currently supported.)

Extended constraints

Here’s the current complete list of supported non-literal leaf predicates:

prefix per Quamina
exists per Quamina except that only "exists":true is supported
anything-but per Quamina
shellstyle per Quamina
numeric Similar to what EventBridge provides, with these additions:

!= and <> are synonymous and mean “not equal”

~= means “within 1e-8”

string Similar to what EventBridge provides for "numeric" except for strings, with this addition:

!= and <> are synonymous and mean “not equal”

equals-insentitive String equality without regard for case

Other capabilities

Dead Person’s Switches

Gaffa supports dead person’s switches that publish events when an expected event doesn’t arrive after a specified interval. An “event didn’t arrive” event has the following structure:

type system/dms
source system
event the event that went missing
last The timestamp the event was last seen
elapsedSecs The number of seconds elapsed since the previous event arrived

Each event published to Gaffa can be annotated with data to support these switches. We’re in the process of enabling many of the current event sources to provide this additional information when appropriate.

Novel event events

Certain events are published with keys that are used to generate events the first time that key has been seen. When Gaffa receives the first event with this key, Gaffa generates an event with following form:

{"source":"system",
 "type":"system/novelty",
 "key":{},
 "event":{}}

The key value is the novelty key above, and the event value is the event that triggered this novelty event.

Examples

The following examples use Space Domain Events (SDE) service, which might be running here.

Get a stream of all events from the system source. In each case, we use events=none to get just JSON lines instead of server-sent events. We are also doing POSTS instead of GETs. To do a GET, pass the body in the POSTs as the value of the query parameter q.

cat<<EOF | curl -d @- https://gaffa.reconometrics.com/stream?events=none
{
  "last": 3,
  "patterns": [
    {
      "source": [
        "system"
      ]
    }
  ]
}
EOF
{"id":"2024-04-12T17:32:49.605963080","data":"pong","source":"system","time":"2024-04-12T17:32:49.605Z","type":"system/pong"}
{"id":"2024-04-12T17:33:19.606209609","data":"pong","source":"system","time":"2024-04-12T17:33:19.606Z","type":"system/pong"}
{"id":"2024-04-12T17:33:19.612978404","data":{"count":0,"intervalSecs":60},"source":"system","time":"2024-04-12T17:33:19.612Z","type":"system/msgCount"}

References

  1. EventBridge patterns
  2. Quamina pattern matching
  3. Server-sent events
  4. CloudEvents