A QUICK EXAMPLE
Consider a dataset of user interactions split into multiple files:
input/part-0
input/part-1
...
Each row of each file contains information about a particular interaction where the first column contains a user-id:
283913 ...
957294 ...
973849 ...
104582 ...
...
The problem to solve is to count the total number of interactions per user using programing-language:
The XRT invocation to use 4 mappers, 4 reducers and a maximum of 2g of memory is as follows:
xrt --mappers 4 \
--reducers 4 \
--memory 2g \
--input input/part-*.tsv \
--mapper "python useractivity.py map" \
--reducer "python useractivity.py reduce" \
--output output
xrt --mappers 4 \
--reducers 4 \
--memory 2g \
--input input/part-*.tsv \
--mapper "ruby useractivity.rb map" \
--reducer "ruby useractivity.rb reduce" \
--output output
xrt --mappers 4 \
--reducers 4 \
--memory 2g \
--input input/part-*.tsv \
--mapper "php useractivity.php map" \
--reducer "php useractivity.php reduce" \
--output output
xrt --mappers 4 \
--reducers 4 \
--memory 2g \
--input input/part-*.tsv \
--mapper "./useractivity map" \
--reducer "./useractivity reduce" \
--output output
xrt --mappers 4 \
--reducers 4 \
--memory 2g \
--input input/part-*.tsv \
--mapper "./useractivity map" \
--reducer "./useractivity reduce" \
--output output
Where the --mapper and --reducer are the commands used to spawn the mapper and reducer subprocesses.
For the mapper, we read lines from stdin, split them into the columns and then assign a reducer-id based on the user-id ensuring that each identical user-id will get processed by the same reducer.
The reducer reads a sorted stream of user-ids from stdin while keeping a running tally and emitting a user-id to count pair when it encounters a new user-id.
from os import environ
from sys import argv, stdin, stdout
def mapper():
reducers = int(environ['REDUCERS'])
for line in stdin:
uid = int(line.split('\t')[0])
rid = uid % reducers
stdout.write(f'{rid}\t{uid}\n')
def reducer():
uid = stdin.readline().rstrip()
count = 1
for line in stdin:
line = line.rstrip()
if uid != line:
stdout.write(f'{uid}\t{count}\n')
uid = line
count = 0
count += 1
stdout.write(f'{uid}\t{count}\n')
if argv[1] == 'map':
mapper()
if argv[1] == 'reduce':
reducer()
def mapper
reducers = ENV['REDUCERS'].to_i
STDIN.each do |line|
uid = line.split('\t').first.to_i
rid = uid % reducers
STDOUT.write "#{rid}\t#{uid}\n"
end
end
def reducer
uid = STDIN.readline.rstrip
count = 1
STDIN.each do |line|
line.rstrip!
if uid != line
STDOUT.write "#{uid}\t#{count}\n"
uid = line
count = 0
end
count += 1
end
STDOUT.write "#{uid}\t#{count}\n"
end
case ARGV[0]
when "map"
mapper
when "reduce"
reducer
end
<?php
function mapper() {
$reducers = (int)$_ENV["REDUCERS"];
while($line = fgets(STDIN)) {
$uid = (int)explode("\t", $line)[0];
$rid = $uid % $reducers;
echo "line: $rid\t$uid\n";
}
}
function reducer() {
$uid = rtrim(fgets(STDIN));
$count = 1;
while($line = fgets(STDIN)) {
$line = rtrim($line);
if ($uid != $line) {
echo "$uid\t$count\n";
$uid = $line;
$count = 0;
}
$count++;
}
echo "$uid\t$count\n";
}
switch($argv[1]) {
case "map":
mapper();
break;
case "reduce":
reducer();
break;
}
?>
using System;
namespace userActivity
{
class Program
{
static void mapper()
{
var reducers = Int32.Parse(Environment.GetEnvironmentVariable("REDUCERS"));
for (var line = Console.ReadLine(); line != null; line = Console.ReadLine()) {
var uid = Int32.Parse(line.Split('\t')[0]);
var rid = uid % reducers;
Console.WriteLine($"{rid}\t{uid}");
}
}
static void reducer()
{
var uid = Console.ReadLine();
var count = 1;
for (string line = Console.ReadLine(); line != null; line = Console.ReadLine()) {
if (uid != line) {
Console.WriteLine($"{uid}\t{count}");
uid = line;
count = 0;
}
count++;
}
Console.WriteLine($"{uid}\t{count}");
}
static void Main(string[] args)
{
switch(args[1]) {
case "map":
mapper();
break;
case "reduce":
reducer();
break;
}
}
}
}
package main
import (
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
)
func mapper() error {
s := bufio.NewScanner(os.Stdin)
w := bufio.NewWriter(os.Stdout)
reducers, err := strconv.Atoi(os.Getenv("REDUCERS"))
if err != nil {
return err
}
for s.Scan() {
uid, err := strconv.Atoi(strings.Split(s.Text(), "\t")[0])
if err != nil {
return err
}
rid := uid % reducers
if _, err := fmt.Fprintf(w, "%d\t%d\n", rid, uid); err != nil {
return err
}
}
if err := s.Err(); err != nil {
return err
}
return w.Flush()
}
func reducer() error {
s := bufio.NewScanner(os.Stdin)
w := bufio.NewWriter(os.Stdout)
if !s.Scan() {
return s.Err()
}
uid := s.Text()
count := 1
for s.Scan() {
if uid != s.Text() {
if _, err := fmt.Fprintf(w, "%s\t%d\n", uid, count); err != nil {
return err
}
uid = s.Text()
count = 0
}
count++
}
if err := s.Err(); err != nil {
return err
}
if _, err := fmt.Fprintf(w, "%s\t%d\n", uid, count); err != nil {
return err
}
return w.Flush()
}
func main() {
var err error
switch os.Args[1] {
case "map":
err = mapper()
case "reduce":
err = reducer()
}
if err != nil {
log.Fatal(err)
}
}
The final output will end up as part files in the --output directory:
output/part-0
output/part-1
output/part-2
output/part-3
Where each part file would contain the user-id to count emitted by the reducers:
283913 142
957294 4
973849 40
104582 32
...