Piper Push Cache Documentation Download Samples Contact  

Process Flow


This is a simple example of a process flow that makes use of the Spawn Task/Actor. The actors are Ruby applications. In a real world deployment the Ruby applications would be more substantial. Possibly writing to a database, transforming JSON, or what ever is necessary for the system being build.

What it does

The process flow is initiated by using curl to send a JSON record to Piper. Piper processes the record according to a flow diagram defined to do several steps and to make a decision and branch to one of two possible branches before terminating.

How it does it

The behavior demonstrated by this sample is specified by the flowing_cfg.json and flowing.flow diagram file.

When any JSON record is pushed to Piper it matches the 'flowing' group. The 'flowing' group using the 'flowing' process flow which is defined in the flowing.flow file. Changing the group's filter would categorize the incoming records differently but for this sample a blank filter catches all pushed JSON.

Jumping to the flowing.flow JSON file the individual tasks are defined. The entry to the Flow is at the chglog Task. This writes the incoming JSON to the change log. Take a look at it after running the sample and see entries.

The next Task in the Flow spawns a Ruby script called number.rb. The script makes use of the piper-ruby gem to get set up for processing. After that it is pretty simple. A counter is incremented and a phrase it added to a 'wall' element of a Hash before shipping the resulting Hash on either an 'odd' or 'even' named link.

require 'piper'

$counter = 0

Piper::Actor::process() { |id,rec|
  rec['wall'] = [] if rec['wall'].nil?
  rec['wall'] << "count me in"

  $counter += 1
  rec['number'] = $counter

  if 0 == ($counter % 2)
    link = "even"
  else
    link = "odd"
  end

  Piper::Actor::ship(id, rec, link)
}

The Flow execution follows either the odd or even link and ends up at the odd or even Task. The application or actor for each of those Tasks is another Ruby script called write_on_wall.rb. It writes on the wall by appending to the 'wall' element of the Hash used in the previous Task.

require 'piper'

Piper::Actor::process() { |id,rec|
  rec['wall'] = [] if rec['wall'].nil?
  rec['wall'] << ARGV[0]

  Piper::Actor::ship(id, rec, nil)
}

Each of the application runs in it's own thread and as a separate process. That process continues to run and wait for new JSON records so unless the application exits there is no need to restart. If the app does exit it will be restarted for the next message processing.

As Piper is processing the Flow it will continue to send new JSON message to the Ruby application. If the Ruby application handles them asynchronously Piper will handle the response according to the 'id' in the response and continue in what ever order the spawned application replies in. It is also able to have multiple outstanding requests simultaneously.

The number task created a branch. Those branches converge on the final Task which is another Ruby script named printer.rb. It's only job is to print out the id and message it received. It is the last Task in the flow.

require 'piper'

Piper::Actor::process() { |id,rec|
  $stderr.puts "#{id}: #{rec}"
}

Note that the output is on stderr and not stdout. The exchange with Piper is on stdin and stdout so writing on stdout would be accepted as a continuation of the flow and would have to be in JSON format. Stderr is what is used for debugging and display for spawned task actors.

Instructions

First Piper must be started. Piper is started is with the flowing_cfg.json file that has the publish logging category turned on. That will generate a log entry for each task entry and exit. It is a bit much for a running system but a good way to see what is happening when exploring or debugging.

$> bin/piper -c samples/flowing/flowing_cfg.json

For a simple exercise just send an empty JSON object. In a separate terminal change to the piper/samples/flowing directory and use curl to push a record to Piper.

$> curl -T empty.json http://localhost:7660/record/aaa

Curl is not the quickest way to push JSON to Piper but it is easy. It may be hard to tell without the logging on but with it on there should be a noticable difference in speed the second and subsequent times that the curl push is invoked. The first time the Ruby applications are being started. After that those applications continue to received and send messages without restarting.

Building on the sample

There are many ways of extending this sample. Some of the enhancements include