Developing the Ingester Platform

_images/ingester_platform_components.png

Domain Objects

The domain of the ingester platform is that of the projects, schemas, datasets, and ingester configurations that are to be collected.

Project Metadata: Region

Not implemented

This represents a 2D area of the Earth’s surface.

Project Metadata: Location

This represents a point on the Earth’s surface. The point has an elevation above MSL, and optionally be part of a region.

Project Metadata: Schema

Describes the fields that can be part of a metadata entry. The currently supported datatypes are: * File * String * Integer * Double * Timestamp

Schemas support multiple inheritance, but all fields must be unique within the schema hierarchy. It is expected that you can search any schema, including base schemas, so any repository implementation will need to take this into account.

Project Metadata: Dataset

A dataset represents a collection of data entrys at a particular location, that conform to a particular schema. Optionally they will include a data source and sampler to populate them. The location is considered to be the frame of reference for the dataset. Additional metadata may be applied to the dataset to convey specific location information.

Ingester Metadata: Data Sources

Data sources retrieve data from a remote source, and processes it to a data entry for ingestion into the repository.

URL Pull Data Source

This data source pulls a single resource from a specified URL (File, FTP, HTTP) and attaches it to the specified field of a new data entry. In the case of HTTP the Last-Modified header is used as the data entry timestamp. This will map the retrieved file to one specific field and produce one data entry.

Dataset Data Source

Gets data from another dataset in the repository. Will download the data from the data entry. By default no data entry is produced.

Ingester Metadata: Samplers

Samplers control when an ingestion event occurs in the case of a pull ingestion.

Periodic Sampler

The periodic sampler fires at a predetermined rate, specified in seconds.

New Data Sampler

Triggers whenever there is new data in another dataset. Passes the data entry ID and dataset as metadata to the data source. Only used with Dataset Data Source.

Ingester Service Components

The ingester platform is build using the http://twistedmatrix.com/ framework, which is a reactor framework. This means that requests and services started using periodic schedulers are serviced by a single thread. This has scaling implications as blocking processes such as ingesting large amounts of data will affect API clients. To overcome this the Ingester Platform has two services threads: the data source service; and the ingester service. Both are part of the IngesterEngine class.

The data source service is fed through a queue by the processSampler method. The the running state of the data source is recorded to prevent multiple concurrent copies of a data source running. Once the data is collected and processed it is placed in an ingest queue.

The ingester service thread is fed from the ingest queue.

processSamplers - Main thread

This method is called periodicly from the Twisted callback. Its purpose is to datasets that are ready to ingest.
  1. Load active datasets
  2. For each sampler
  1. Skip if there is no sampler or if the ingester is running
  2. Load sampler state (processSampler)
  3. Run sampler
  4. Persist sampler state
  5. Queue dataset if sampler returned True

processQueue - Data source thread

This reads the dataset queue for new ingesters to run. It relies on the data source always returning or timing itself out.
  1. Load data source state
  2. Create temperary directories
  3. Run data source
  4. If there is a post processing script run it
  5. Queue output data for ingest
  6. Persist data source state

processIngestQueue - Ingest thread

This thread ingests data into the repository. It relies on the repository access being thread safe.
  1. For each data entry
  1. Ingest into repository

Ingesters

An ingester is an object that is invoked by the ingester platform to assemble zero or more Data Entries. Each ingester will need a domain object in the jcudc24ingesterapi.models.data_sources module, and an implementation in the dc24_ingester_platform.ingester.data_sources module. The domain object will need to have each of the valid configuration parameters as Python properties, or our typed properties.

Testing from the command line

To test an ingester you can run it from the command line. To do this you will need to create a config file, a working directory, and then invoke it using the run_ingester <config> <cwd> [script] script. The script takes 2 manditory arguments, and one optional argument. These are

  • An ingester config file
  • A working directory
  • And optionally, a post processing script.

A sample config is

{
   "class":"pull_data_source",
    "state":{},
    "parameters":{},
    "config":{"url":"http://www.abc.net.au","field":"page"}
}

Then, to run you could call: run_ingester pull.json /tmp then you should see an output such as

Initial results
---------------
Time: 2012-12-19T22:52:42.000Z Dataset: None
     file = FileObject(f_path: outputfile0, mime_type: )

Time: 2012-12-19T22:56:32.000Z Dataset: None
     file = FileObject(f_path: outputfile1, mime_type: )


...
Processed results
-----------------
Time: 2013-02-06T10:32:04.493Z Dataset: None
     temp = 29.9375

Time: 2013-02-06T10:32:04.493Z Dataset: None
     temp = 29.75

...

Ingester Post Processing Scripts

A post processing script is called with a workspace directory (cwd), and the data entry object that is being processed. The returned data entries are those which will be actully ingested into the dataset:

from jcudc24ingesterapi.models.data_entry import DataEntry, FileObject

def process(cwd, data_entry):
    data_entry = data_entry[0]
    ret = []
    with open(os.path.join(cwd, data_entry["file1"].f_path)) as f:
        for l in f.readlines():
            l = l.strip().split(",")
            if len(l) != 2: continue
            new_data_entry = DataEntry(timestamp=datetime.datetime.now())
            new_data_entry["a"] = FileObject(f_path=l[1].strip())
            ret.append( new_data_entry )
    return ret

Search Interface

A search will return a specific object type, based on a set of criteria that may reference other related objects. Complex searches could be constructed using using a search tree marshalled to a prefix notation, and then used to generate the search queries. Complex situations may arise when crossing between ingester and repository objects.

Repository Integration

The ingester platform requires a repository in which to store the data that it processes. The repository gets notified when any action occurs on project metdata with an opportunity to store references back in the ingester platform database. The repository is also exclusively used for the storage of data. It is expected to support at least data storage and retrieval methods.

DAM Integration

The DAM is the preferred repository for the ingester platform. May of the project metadata object map directly to the DAM metadata objects. If only value criteria are allowed, then this could be resolved by first querying one system then the other. Initially only a subset of the attributes will be searchable.

Project Metadata DAM Metadata
Region
Location Location
Schema Schema
Dataset Dataset
DataEntry Observation

Management API

The EnMaSSe Ingester Platform has no specific user interface of its own, rather, all interactions occur using the web services API. The API provides methods for creating and managing all domain objects, and processes.

Object Manipulation

Management Processes

The main purpose of the EnMaSSe Ingester Platform is to manage data ingestion. The following methods enable this.

Enable Ingestion

enableDataset(dataset_id)

Disable Ingestion

disableDataset(dataset_id)

Reprocess Derived Data

runIngester(dataset_id)

It is possible to manually trigger an ingestion of derived data in the situation where there is a raw data dataset, containing data files, and a derived dataset that uses the raw data dataset as a dataset data source. This has the caviet that if there is already data in the derived dataset then invoking this process may create duplicate data entries.

Retrieve Ingester Log

getIngesterEvents(dataset_id)

This method is used for retrieving all the available ingester logs. These can give insight into if any why an ingester process is failing.