Wednesday, October 8, 2014

PIG basics


How many ways Pig Latin Commands can be executed ?

Three.

Grunt interactive shell, through a script file, and as embedded queries inside Java programs

To enter into grunt shell which command you use?

pig.

To run pig script which command need to use ?

pig myscript.pig

What is the meaning of running Pig in Hadoop ?

Running Pig in Hadoop mode means the compile Pig program will physically execute in a Hadoop installation.


Grunt shell local mode

pig -x local

Entering the Grunt shell in Hadoop mode is

pig -x mapreducepig -x mapreduce

if no arguments supplied by default it is mapreduce


grunt> set debug on
grunt> set job.name 'my job'


The debug parameter states whether debug-level logging is turned on or off. The job. name parameter takes a single-quoted string and will use that as the Pig program’s Hadoop job name.



The exec command executes a Pig script in a separate space from the Grunt shell


The command run executes a Pig script in the same space as Grunt (also known as interactive mode)



grunt> log = LOAD 'tutorial/data/excite-small.log' AS (user, time, query);

The above commands loads the data into alias or variable called log



Pig parses your statements but doesn’t physically execute them until you use a DUMP or STORE command to ask for the results

The DUMP command prints out the content of an alias whereas the STORE command stores the content to a file.


The LIMIT command allows you to specify how many tuples (rows) to return back. For example, to see four tuples of log
grunt> lmt = LIMIT log 4;
grunt> DUMP lmt;

The above statements only outputs 4 tuples.




grunt> log = LOAD 'tutorial/data/excite-small.log'
➥ AS (user:chararray, time:long, query:chararray); grunt> grpd = GROUP log BY user; grunt> cntd = FOREACH grpd GENERATE group, COUNT(log); grunt> STORE cntd INTO 'output';

above script equals to below statement in SQL.

SELECT user, COUNT(*) FROM excite-small.log GROUP BY user;


Difference between SQL and Pig Latin?

Pig Latin is a data processing language. You’re specifying a series of data processing steps instead of a complex SQL query with clauses.

In SQL, we define a relation’s schema before it’s populated with data. Pig takes a much looser approach to schema. In fact, you don’t need to use schemas if you don’t want to, which may be the case when handling semistructured or unstructured data


can expose Pig’s schema for any relation with the DESCRIBE command

grunt> DESCRIBE log;
log: {user: chararray,time: long,query: chararray}

A GROUP BY operation on the relation log generates the relation grpd. Based on the operation and the schema for log, Pig infers a schema for grpd:

grunt> DESCRIBE grpd;

grpd: {group: chararray,log: {user: chararray,time: long,query: chararray}}

grpd. The field logis a bag with subfields user, time, and query.


grunt> DESCRIBE cntd;
cntd: {group: chararray,long}


ILLUSTRATE does a sample run to show a step-by-step process on how Pig would compute the relation.


EXPLAIN:   EXPLAIN [-out path] [-brief] [-dot] [-param ...]
[-param_file ...] alias;
Display the execution plan used to compute a relation. When used with a script name, for example, EXPLAIN myscript.pig, it will show the execution plan of the script.


In order for ILLUSTRATE to work, the load command in the first step must include a schema.

subsequent transformations must not include the LIMIT or SPLIT operators, or the nested FOREACH operator, or the use of the map data type




Data types and schemas:

Fields default to bytearray unless specified otherwise.

int                     Signed 32-bit integer
long                  Signed 64-bit integer
float                2-bit floating point
double                 4-bit floating point
chararray            Character array (string) in Unicode UTF-8
bytearray             Byte array (binary object)

Pig’s data model from the top down

Tuple    :


(12.5,hello world,-2)
A tuple is an ordered set of fields. It’s most often used as a row in a relation. It’s represented by fields separated by commas, all enclosed by parentheses.    

Bag:

{(12.5,hello world,-2),(2.87,bye world,10)}
A bag is an unordered collection of tuples. A relation is a special kind of bag, sometimes called an outer bag. An inner bag is a bag that is a field within some complex type.
A bag is represented by tuples separated by commas, all enclosed by curly brackets.
Tuples in a bag aren’t required to have the same schema or even have the same number of fields. It’s a good idea to do this though, unless you’re handling semistructured or unstructured data.

Map:

[key#value]
A map is a set of key/value pairs. Keys must be unique and be a string (chararray). The value can be any type.
You reference fields inside maps through the pound operator instead of the dot operator. For a map named m, the value associated with key k is referenced through m#k.


Users can define schemas for relations using the ASkeyword with the LOAD, STREAM, and FOREACH operators.





Relational operators




UNION combines multiple relations together whereas SPLIT partitions a relation into multiple ones. An example will make it clear:
grunt> a = load 'A' using PigStorage(',') as (a1:int, a2:int, a3:int);
grunt> b = load 'B' using PigStorage(',') as (b1:int, b2:int, b3:int);
grunt> DUMP a;
(0,1,2)
(1,3,4)
grunt> DUMP b;
(0,5,2)
(1,7,8)
grunt> c = UNION a, b;
grunt> DUMP c;
(0,1,2)
(0,5,2)
(1,3,4)
(1,7,8)
grunt> SPLIT c INTO d IF $0 == 0, e IF $0 == 1;
grunt> DUMP d;
(0,1,2)
(0,5,2)
grunt> DUMP e;
(1,3,4)
(1,7,8)

You can use the DISTINCT operator to remove duplicates from a relation



You can simulate SPLITby multiple FILTERoperators. The FILTER operator alone trims a relation down to only tuples that pass a certain test:
grunt> f = FILTER c BY $1 > 3;
grunt> DUMP f;
(0,5,2)
(1,7,8)


SAMPLE is an operator that randomly samples tuples in a relation according to a specified percentage.



grunt> g = GROUP c BY $2;
grunt> DUMP g;
(2,{(0,1,2),(0,5,2)})
(4,{(1,3,4)})
(8,{(1,7,8)})
grunt> DESCRIBE c;
c: {a1: int,a2: int,a3: int}
grunt> DESCRIBE g;
g: {group: int,c: {a1: int,a2: int,a3: int}}



The first field of GROUP’s output relation is always named group,


one can put all tuples in a relation into one big bag. This is useful for aggregate analysis on relations, as functions work on bags but not relations. For example:


grunt> h = GROUP c ALL;
grunt> DUMP h;
(all,{(0,1,2),(0,5,2),(1,3,4),(1,7,8)})
grunt> i = FOREACH h GENERATE COUNT($1);
grunt> dump i;
(4L)





Now that you’re comfortable with GROUP, we can look at COGROUP, which groups together tuples from multiple relations. It functions much like a join. For example, let’s cogroup a and b on the third column.
grunt> j = COGROUP a BY $2, b BY $2;
grunt> DUMP j;
(2,{(0,1,2)},{(0,5,2)})
(4,{(1,3,4)},{})
(8,{},{(1,7,8)})
grunt> DESCRIBE j;
j: {group: int,a: {a1: int,a2: int,a3: int},b: {b1: int,b2: int,b3: int}}




Whereas GROUP always generates two fields in its output, COGROUP always generates three (more if cogrouping more than two relations). The first field is the group key, whereas the second and third fields are bags. These bags hold tuples from the cogrouping relations that match the grouping key.


If a grouping key matches only tuples from one relation but not the other, then the field corresponding to the nonmatching relation will have an empty bag. To ignore group keys that don’t exist for a relation, one can add the INNER keyword to the operation, like



grunt> j = COGROUP a BY $2, b BY $2 INNER;
grunt> dump j;
(2,{(0,1,2)},{(0,5,2)})
(8,{},{(1,7,8)})
grunt> j = COGROUP a BY $2 INNER, b BY $2 INNER;
grunt> dump j;
(2,{(0,1,2)},{(0,5,2)})









grunt> k = FOREACH c GENERATE a2, a2 * a3;
grunt> DUMP k;
(1,2)
(5,10)
(3,12)
(7,56)

FOREACH is always followed by an alias (name given to a relation) followed by the keyword GENERATE. The expressions after GENERATE control the output.






FOREACH has special projection syntax, and a richer set of functions. For example,
applying nested projection to have each bag retain only the first field:
grunt> k = FOREACH g GENERATE group, c.a1;
grunt> DUMP k;
(2,{(0),(0)})
(4,{(1)})
(8,{(1)})
To get two fields in each bag:
grunt> k = FOREACH g GENERATE group, c.(a1,a2);
grunt> DUMP k;

(2,{(0,1),(0,5)})



(4,{(1,3)})
(8,{(1,7)})
Most built-in Pig functions are geared toward working on bags.
grunt> k = FOREACH g GENERATE group, COUNT(c);
grunt> DUMP k;
(2,2L)
(4,1L)
(8,1L)


To get two fields in each bag:
grunt> k = FOREACH g GENERATE group, c.(a1,a2);
grunt> DUMP k;
(2,{(0,1),(0,5)})

(4,{(1,3)})
(8,{(1,7)})
Most built-in Pig functions are geared toward working on bags.
grunt> k = FOREACH g GENERATE group, COUNT(c);
grunt> DUMP k;
(2,2L)
(4,1L)
(8,1L)



The FLATTEN function is designed to flatten nested data types. Syntactically it looks like a function, such as COUNT and AVG, but it’s a special operator as it can change the structure of the output created by FOREACH...GENERATE.

For example, consider a relation with tuples of the form (a, (b, c)). The statement FOREACH... GENERATE $0, FLATTEN($1) will create one output tuple of the form (a, b, c) for each input tuple.


If a bag contains N tuples, flattening it will remove the bag and create N tuples in its place.
grunt> k = FOREACH g GENERATE group, FLATTEN(c);
grunt> DUMP k;
(2,0,1,2)
(2,0,5,2)
(4,1,3,4)
(8,1,7,8)
grunt> DESCRIBE k;
k: {group: int,c::a1: int,c::a2: int,c::a3: int}
Another way to understand FLATTEN is to see that it produces a cross-product. This view is helpful when we use FLATTENmultiple times within a single FOREACHstatement. For example, let’s say we’ve somehow created a relation l.




Another way to understand FLATTEN is to see that it produces a cross-product. This view is helpful when we use FLATTENmultiple times within a single FOREACHstatement. For example, let’s say we’ve somehow created a relation l.
grunt> dump l;
(1,{(1,2)},{(3)})
(4,{(4,2),(4,3)},{(6),(9)})
(8,{(8,3),(8,4)},{(9)})
grunt> describe l;
d: {group: int,a: {a1: int,a2: int},b: {b1: int}}
The following statement that flattens two bags outputs all combinations of those two bags for each tuple:
grunt> m = FOREACH l GENERATE group, FLATTEN(a), FLATTEN(b);
grunt> dump m;


(1,1,2,3)
(4,4,2,6)
(4,4,2,9)
(4,4,3,6)
(4,4,3,9)
(8,8,3,9)
(8,8,4,9)




bags. Let’s assume you have a relation (say l) and one of its fields (say a) is a bag, a
FOREACH with nested block has this form:
alias = FOREACH l {
tmp1 = operation on a;
[more operations...]
GENERATE expr [, expr...]
}
The GENERATE statement must always be present at the end of the nested block. It will create some output for each tuple in l. The operations in the nested block can create new relations based on the bag a. For example, we can trim down the a bag in each element of l’s tuple.




grunt> m = FOREACH l {
tmp1 = FILTER a BY a1 >= a2;
GENERATE group, tmp1, b;
}
grunt> DUMP m;
(1,{},{(3)})
(4,{(4,2),(4,3)},{(6),(9)})
(8,{(8,3),(8,4)},{(9)})
You can have multiple statements in the nested block. Each one can even be operating on different bags.
grunt> m = FOREACH l {
tmp1 = FILTER a BY a1 >= a2;
tmp2 = FILTER b by $0 < 7;
GENERATE group, tmp1, tmp2;
};
grunt> DUMP m;
(1,{},{(3)})
(4,{(4,2),(4,3)},{(6)})
(8,{(8,3),(8,4)},{})









UDF- User Defined Functions:


UDFs are written in Java, and filter functions are all subclasses of FilterFunc, which
itself is a subclass of EvalFunc.

example of UDF.


package com.hadoopbook.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
public class IsGoodQuality extends FilterFunc {
@Override
public Boolean exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return false;
}

try {
Object object = tuple.get(0);
if (object == null) {
return false;
}
int i = (Integer) object;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
}


After UDF class is created make a jar and then register with pig as follows

grunt> REGISTER pig.jar;


Finally, we can invoke the function:
grunt> filtered_records = FILTER records BY temperature != 9999 AND
>> com.hadoopbook.pig.IsGoodQuality(quality);



We can’t register our package with Pig, but we can shorten the function name by defining
an alias, using the DEFINE operator:
grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);



Leveraging types:



The filter works when the quality field is declared to be of type int, but if the type
information is absent, then the UDF fails! This happens because the field is the default
type, bytearray, represented by the DataByteArray class. Because DataByteArray is not
an Integer, the cast fails.
The obvious way to fix this is to convert the field to an integer in the exec() method.
However, there is a better way, which is to tell Pig the types of the fields that the function
expects. The getArgToFuncMapping() method on EvalFunc is provided for precisely this
reason. We can override it to tell Pig that the first field should be an integer:
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();
funcSpecs.add(new FuncSpec(this.getClass().getName(),
new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
return funcSpecs;
}





A Load UDF
We’ll demonstrate a custom load function that can read plain-text column ranges as
fields, very much like the Unix cut command. It is used as follows:
grunt> records = LOAD 'input/ncdc/micro/sample.txt'
>> USING com.hadoopbook.pig.CutLoadFunc('16-19,88-92,93-93')
>> AS (year:int, temperature:int, quality:int);
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)


The string passed to CutLoadFunc is the column specification; each comma-separated
range defines a field, which is assigned a name and type in the AS clause. Let’s examine
the implementation of CutLoadFunc,



public class CutLoadFunc extends Utf8StorageConverter implements LoadFunc {
private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final byte RECORD_DELIMITER = (byte) '\n';
private TupleFactory tupleFactory = TupleFactory.getInstance();
private BufferedPositionedInputStream in;
private long end = Long.MAX_VALUE;
private List<Range> ranges;
public CutLoadFunc(String cutPattern) {
ranges = Range.parse(cutPattern);
}
@Override
public void bindTo(String fileName, BufferedPositionedInputStream in,
long offset, long end) throws IOException {

this.in = in;
this.end = end;
// Throw away the first (partial) record - it will be picked up by another
// instance
if (offset != 0) {
getNext();
}
}
@Override
public Tuple getNext() throws IOException {
if (in == null || in.getPosition() > end) {
return null;
}
String line;
while ((line = in.readLine(UTF8, RECORD_DELIMITER)) != null) {
Tuple tuple = tupleFactory.newTuple(ranges.size());
for (int i = 0; i < ranges.size(); i++) {
try {
Range range = ranges.get(i);
if (range.getEnd() > line.length()) {
LOG.warn(String.format(
"Range end (%s) is longer than line length (%s)",
range.getEnd(), line.length()));
continue;
}
tuple.set(i, new DataByteArray(range.getSubstring(line)));
} catch (ExecException e) {
throw new IOException(e);

}
}
return tuple;
}
return null;
}
@Override
public void fieldsToRead(Schema schema) {
// Can't use this information to optimize, so ignore it
}
@Override
public Schema determineSchema(String fileName, ExecType execType,
DataStorage storage) throws IOException {
// Cannot determine schema in general
return null;
}
}



STREAM
The STREAM operator allows you to transform data in a relation using an external
program or script. It is named by analogy with Hadoop Streaming, which provides a
similar capability for MapReduce (see “Hadoop Streaming” on page 32).
STREAM can use built-in commands with arguments. Here is an example that uses the
Unix cut command to extract the second field of each tuple in A. Note that the command
and its arguments are enclosed in backticks:
grunt> C = STREAM A THROUGH `cut -f 2`;
grunt> DUMP C;
(cherry)
(apple)
(banana)
(apple)




Parallelism



When running in Hadoop mode, you need to tell Pig how many reducers you want for
each job. You do this using a PARALLEL clause for operators that run in the reduce
phase, which includes all the grouping and joining operators (GROUP, COGROUP,
JOIN, CROSS), as well as DISTINCT and ORDER. By default the number of reducers
is one (just like for MapReduce), so it is important to set the degree of parallelism when
running on a large dataset. The following line sets the number of reducers to 30 for the
GROUP:

grouped_records = GROUP records BY year PARALLEL 30;





Parameter Substitution

If you have a Pig script that you run on a regular basis, then it’s quite common to want
to be able to run the same script with different parameters. For example, a script that
runs daily may use the date to determine which input files it runs over. Pig supports
parameter substitution, where parameters in the script are substituted with values
supplied at runtime. Parameters are denoted by identifiers prefixed with a $ character;
for example $input and $output, used in the following script to specify the input and
output paths:
-- max_temp_param.pig
records = LOAD '$input' AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
STORE max_temp into '$output';
Parameters can be specified when launching Pig, using the -param option, one for each
parameter:
% pig \
-param input=/user/tom/input/ncdc/micro-tab/sample.txt \
-param output=/tmp/out \

src/main/ch11/pig/max_temp_param.pig






No comments:

Post a Comment