Plumb User Guide

Overview

Plumb is a system for processing streams of large files. A Plumb workflow consists of a sequence of Jobs, each of which processes files from its input queue and produces output that can be queued for processing by one or more other Jobs.

A Job and its Worker

A Plumb Job consumes one or more inputs and creates one or more outputs.

Single Input and Output

By default, with one input and output, data come on stdin and out on stdout. (These come from HDFS, but your program doesn’t need to care.)

A Plumb job is created for each input that appears on a queue of a given type. Plumb queues will be automatically created if they don’t exist.

A plumb job is defined with YAML that specifies the job, the input queue and the output queue.

For example:

- program: sz_to_xz
  input: pcap.sz
  output: pcap.xz

with the program (sz_to_xz):

#!/bin/sh
# sz_to_xz
szcat | xz -c

Will recompress everything in the pcap.sz queue and put them to the pcap.xz queue.

The program line can take arguments but does not support arbitrary shell expressions like pipelines. (Which is why sz_to_xz has to be a separate script.) If the program name contains spaces or single quote, make sure to enclose the whole program within double-quotes. As an example:

- program: "/user/plumb/jobserver/user-progs/plumbmover.sh  -M pstream -R1 -O day -v -v  -S'-i  /path/to/somekey' -H somebody@host.isi.edu - /some_destination/"

Multiple Outputs

For a pipeline stage, if sum of number of input(s) and output(s) exceed two, then Plumb provides input(s) and output(s) using command line via multiple switches -i and -o for input(s) and ouptputs(s) respectively. Following the -i or -o argument will be local FIFOs with names like 20190709-212835-00011659.ari.pcap.sz etc. Such names are helpful for user program to robustly check that which argument is where (It is not a good idea to rely on positional args. See example a little big later how to do it). User program needs to read from / write to these provided fifos.
Plumb feed data into these fifos and read data out of fifos.

A plander job that starts with pcap.sz and ultimately ends with message_question.fsdb.xz:

-
    input:   pcap.sz
    program: "/user/plumb/jobserver/user-progs/dnsanon_wrapper_snzip.sh"
    output:  [ message_question.fsdb, message.fsdb ]
-
    input:   message_question.fsdb
    program: "/user/plumb/jobserver/user-progs/xz2"
    output:  message_question.fsdb.xz 

Program needs to reside somewhere in HDFS (a location readable by the user configured to run Hadoop).

Multiple Inputs

While Plumb can take more than one inputs into a stage program, but we currently don’t have a working example for such a case. Additionally, one caveat is that if data on multiple inputs do not overlap in time, the semantics of processing for the code is not clear. We hope that our ongoing work on advanced windowing will resolve this issue.

Job Options: Contraints and Rate Limiting

Other options can be provided, in addition to input and output. Rate limiting (maximum number of concurrent jobs) can be specified with:

    max_concurrent_jobs: 10

In the future we plan to add resource constraints. For example, core: 2 and memory_gb: 16 will reserve the given number of cores or memory for the job (By default today each job gets 1 core and about 2GB of memory.) That implies that the user program should strive to use just one core and should not buffer a lot of data. At this point, we haven’t yet enabled YARN’s CPU enforcer but we plan to do so in future.

Windowing (experimental)

As an experimental feature Plumb currently supports windowing, e.g.:

-
        input: rssacint
        program: "/mapreduce_wrapper.sh"
        output: rssac_mapreduce_stage1
        window:
            size:           "24 hours"
            start:          "2020-04-01-00-00-UTC"
            max_wait:       "3 hours"               
            cutoff_delivery_requirement: "Cutoff_Exactly_Once_To_Current_Window"
            use_queue_sequence_number_instance: "Yes"
            queue_sequence_number_instance_list: "lax,mia,ari"

or

-
    input:   message.fsdb
    program: "/user/aqadeer/jobserver/user-progs/rssac_wrapper.sh"
    output:  rssacint
    window:
      start_and_end:    "2020-04-02-00-00-UTC to 2020-04-05-00-00-UTC"

Stateful Streaming

For stateful streaming, use this style of YAML:

-
    input:   pcap.sz
    program: "/user/plumb/jobserver/user-progs/zeek_app.sh"
    output:  [weird.log, notice.log ]
    abstraction: stateful_streaming

(xxx need more details here)

Environment for user programs

When plumb runs a user program, following environment variables are available to user job:

PLUMB_INPUTS

  • comma seperated full path to hdfs input file(s)
  • Example:

    PLUMB_INPUTS=/user/plumb/hle/raw/message_question.fsdb/conqdata/20190717-174112-01529933.lax.message_question.fsdb

PLUMB_OUTPUTS

  • Comma seperated full paths to hdfs output files(s) or empty string if user used keyword “None” for any one of the output(s) of a stage
  • Example:

    PLUMB_OUTPUTS=/user/plumb/hle/raw/message_question.fsdb.xz/20190717-174112-01529933.lax.message_question.fsdb.xz

Managing Jobs as a User

Jobs have 4 possible states: submitted, accepted, rejected, retracted. They start out submitted, then become accepted once they start executing (or rejected if they are misconfigured). An accepted job becomes retracted when a user asks to have it removed.

Submitting Jobs

Run /usr/local/bin/plumb-job to see the available list of commands on any plumb-installed system. New jobs are describe in a YAML file and submitted as: /usr/local/bin/plumb-job submitJob /tmp/szToxz.yaml Note that Plumb application server will need to wait for all the currently running jobs to finish on the YARN cluster and will need to restart to incorporate any submit or retract changes across all users. This process can take a while because there could be many jobs currently running on the cluster.

Current YARN cluster status can be monitored via YARN status page: http://<head-node>:8088/cluster/nodes

Once submitted, the job will start as soon as all current running jobs complete. (An implication is that submitting a new job requires draining the queue, so don’t do it casually.)

Retracting Jobs

Run

/usr/local/bin/plumb-job retractJob JOB-URI

Just like submission case, all currently running jobs first need to finish. After that jobs will be retracted.

(xxx: what are job-uris? are they the lines from listAcceptedJobs?)

Showing the Workflow

plumb-job listAcceptedJobs   #lists accepted jobs for (you) the user
plumb-job listSubmittedJobs
plumb-job listRejectedJobs
plumb-job listRetractedJobs

System Commands

Create a pipeline.yml file in the current directory with details for all jobs in the system:

plumb-job getOptYamlText

Create a pipeline.png file showing the diagram of the optimized workflow:

plumb-job getOptYamlDiagram

Watching Data flow

The command /usr/local/bin/plumb allows one to watch data flow through the system.

Enqueue a file <filename> into the queue <qname>:

plumb put <filename> <qname>

List all of the current active queue names:

plumb ls-q

List currently queued files (aka large blocks) for a given <qname>:

plumb ls-f <qname>

Remove a file from a queue:

plumb rm-f <qname> <filename> 

Display a queue entry checksum:

plumb queue-entry-checksum <file-uri>

Dump a file’s content to stdout:

plumb queue-entry-cat <file-uri>

Note: It is advised to use double quotes while providing <qname>, because some characters (like dot) in names, if provided without double quotes can cause the program to give wrong results.

Managing Faults

List faulty files in a queue:

plumb ls-faulty

Mark a file in a queue as faulty:

plumb queue-entry-fault

Marks a file as healthy so that Plumb retires it for processing

plumb queue-entry-restore

Managing Windowing

Please note that windowing functionality is not well tested and cannot be used in production environment at this time.

List those queues that have windows

plumb ls-windowed-q

list possible states a window can be

plumb ls-all-window-statuses

list windows inside a queue

plumb ls-win

list windows in a queue in a specific status

plumb ls-win-in-status

set status of a window (we can use it to artifically make a window FORMED so that it runs for processing)

plumb set-win-status

list files in a window

plumb ls-f-in-win

change window no for a file

plumb mv-f-in-win

experimental functions to facilitate debugging (xxx don’t work)

plumb ls-missing-files-in-a-window, ls-site-info-in-a-window

Note: Contrary to other commands in the plumb tool, all windowing functions need some state from Plumb. Therefore HLE/Window Manager must be operational and up-to-date for the above windowing functions to work.

The course of action if new windowing does not work as expected:

  1. See if the window is complete using plumb command line windowing functions.
  2. If you think the window should have triggered but it didn’t, manually set the window status to FORMED.
  3. If doing the above still doesn’t trigger the window in above 6 minutes, save plumb and HLE logs somewhere so that I could see what went wrong. If some failure happens after running window job, also save the YARN log of that job.

Dealing with Failed Jobs

If a user job fails, it is necessary for the user to understand what went wrong. Plumb generates substantial diagnostic data for each job to facilitate such root-cause findings. User can also generate his/her own debugging data from his/her code (e.g. using set -x in a bash script). To see all such debugging data do the following:

  1. Go to the cluster status page (something like http://<headnode>:8088/cluster/apps/)
  2. Find your failed job. Specifically, we need the ID of the failed job (something like application_1568404903922_5011798)
  3. Open a terminal to any of the cluster boxes and run the following command to view yarn logs:

    yarn logs -applicationId application_1568404903922_5011798
    

Feeding data to PLUMB

Data can be simply put into a desired queue by running:

plumb put <fileuri> <qname>

Where:

<fileuri> points either to a local file or HDFS

<qname> one of the queues created by running plumb-job submitJob and displayed by running plumb ls-q