XRT

Simple Data Processing

Download

INTRODUCTION

XRT is a programming-language-independent and resource-aware MapReduce runtime for multi-core systems. It can turn anything from resource-constrained single-board computers to large servers, with over a hundred cores, into high-performance data-processing environments.


FOR MEDIUM-DATA PROBLEMS

Most organizations do not have the data volumes to justify cluster-based data-processing. XRT is an exploration of what a data-processing system might look like if built with medium-data problems in mind.

ANY PROGRAMMING-LANGUAGE

Why is it considered reasonable to maintain a separate data-stack from the production-stack? XRT encourages collaboration across all codebases and teams by enabling data processing in the same programming languages used in other parts of the organization.

LOW-LEVEL

When you reach out for a general programming language to solve data problems you are likely looking for flexibility or performance. XRT offers both by using a flexible MapReduce programming model while simultaneously providing high performance through low-level optimizations.

SIMPLE

XRT differs philosophically from most data-processing systems by being composed of a small set of carefully chosen features. Also, XRT has zero-configuration, a single-binary deployment and cross-platform compatibility resulting in low operational overhead and quick bootstrapping of development environments.

THE RUNTIME

The classical MapReduce programming model is a constant source of inspiration for XRT. However, to enable various features, the XRT runtime contains some fundamental differences. These differences make it easy to reason about the runtime, facilitates development and help in delivering top performance.


MULTI-CORE ONLY

XRT is built to scale vertically on modern high core-count systems instead of horizontally across a cluster. As a result, XRT will benefit from high parallelism but avoids the performance implications and operational overhead of traditional cluster-based data processing runtimes.

NEWLINE-SEPARATED DATA

To enable the programming-language independent capabilities, XRT only includes native support for reading and writing newline-separated data formats. However, XRT can be extended to process any data format by implementing the read or write logic within the mapper or reducer.

PROCESS-BASED PARALLELISM

XRT enables parallel data processing by spawning multiple mappers and reducers as subprocesses and connecting them with the XRT runtime over stdin and stdout. This model, influenced by Hadoop Streaming, enables any programming language that can read and write from the standard streams to work with XRT.

RESOURCE-AWARE

XRT attempts to keep all intermediate data in memory to ensure optimal performance whenever possible. However, resource-awareness is a core concern, and XRT is capable of utilizing efficient disk-based data structures for processing vast volumes of data. In a recent test, XRT efficiently sorted 100Gb using 1Gb of memory.

PERFORMANCE

Optimized XRT jobs are on average 2x faster than equivalent jobs running on the same system using popular distributed data-processing solutions. When XRT is run on a single high core-count system and compared to popular distributed data-processing solutions running on a networked cluster with an equal number of cores, the performance difference is even greater.

FLEXIBLE

XRT enables a flexible data processing environment by making input, output and reducer optional. Running a map-only job is very common for injective transformations like simple parallelized ETL problems. Omitting input or output is common when input or output logic is implemented within the mapper or reducer code directly.

A QUICK EXAMPLE

Consider a dataset of user interactions split into multiple files:


input/part-0
input/part-1
...

Each row of each file contains information about a particular interaction where the first column contains a user-id:


283913	...
957294	...
973849	...
104582	...
...

The problem to solve is to count the total number of interactions per user using programing-language:


The XRT invocation to use 4 mappers, 4 reducers and a maximum of 2g of memory is as follows:


xrt --mappers 4 \
    --reducers 4 \
    --memory 2g \
    --input input/part-*.tsv \
    --mapper "python useractivity.py map" \
    --reducer "python useractivity.py reduce" \
    --output output

Where the --mapper and --reducer are the commands used to spawn the mapper and reducer subprocesses. For the mapper, we read lines from stdin, split them into the columns and then assign a reducer-id based on the user-id ensuring that each identical user-id will get processed by the same reducer. The reducer reads a sorted stream of user-ids from stdin while keeping a running tally and emitting a user-id to count pair when it encounters a new user-id.


from os import environ
from sys import argv, stdin, stdout

def mapper():
    reducers = int(environ['REDUCERS'])
    for line in stdin:
        uid = int(line.split('\t')[0])
        rid = uid % reducers
        stdout.write(f'{rid}\t{uid}\n')

def reducer():
    uid = stdin.readline().rstrip()
    count = 1
    for line in stdin:
        line = line.rstrip()
        if uid != line:
            stdout.write(f'{uid}\t{count}\n')
            uid = line
            count = 0
        count += 1
    stdout.write(f'{uid}\t{count}\n')

if argv[1] == 'map':
    mapper()
if argv[1] == 'reduce':
    reducer()

The final output will end up as part files in the --output directory:


output/part-0
output/part-1
output/part-2
output/part-3

Where each part file would contain the user-id to count emitted by the reducers:


283913	142
957294	4
973849	40
104582	32
...