hadoop - How do I transform a parameter in Pig? -
i need process dataset in pig, available once per day @ midnight. therefor have oozie coordinator takes care of scheduling , spawns workflow every day @ 00:00. file names follow uri scheme
hdfs://${dataroot}/input/raw${year}${month}${day}${hour}.avro
where ${hour} '00'.
each entry in dataset contains unix timestamp , want filter out entries have timestamp before 11:45pm (23:45). need run on datasets past, value of timestamp defining threshold needs set dynamically according day processed. example, proessing dataset december, 12th 2013 needs threshold 1418337900. reason, setting threshold must done coordinator.
to best of knowledge, there no possibility transfrom formatted date unix timestamp in el. came quite hacky solution: coordinator passes date , time of threshold respective workflow starts parameterized instance of pig script.
excerpt of coordinator.xml:
<property> <name>threshold</name> <value>${coord:formattime(coord:dateoffset(coord:nominaltime(), -15, 'minute'), 'yyyymmddhhmm')}</value> </property>
excerpt of workflow.xml:
<action name="foo"> <pig> <job-tracker>${jobtracker}</job-tracker> <name-node>${namenode}</name-node> <script>${applicationpath}/flights.pig</script> <param>jobinput=${jobinput}</param> <param>joboutput=${joboutput}</param> <param>threshold=${threshold}</param> </pig> <ok to="end"/> <error to="error"/> </action>
the pig script needs convert formatted datetime unix timestamp. therefor, have writte udf:
public class unixtime extends evalfunc<long> { private long mytimestamp = 0l; private static long convertdatetime(string dt, string format) throws ioexception { dateformat formatter; date date = null; formatter = new simpledateformat(format); try { date = formatter.parse(dt); } catch (parseexception ex) { throw new ioexception("illegal date: " + dt + " format: " + format); } return date.gettime() / 1000l; } public unixtime(string dt, string format) throws ioexception { mytimestamp = convertdatetime(dt, format); } @override public long exec(tuple input) throws ioexception { return mytimestamp; } }
in pig script, macro created, initializing udf input of coordinator/workflow. then, can filter timestamps.
define thresh mystuff.pig.unixtime('$threshold', 'yyyymmddhhmm'); d = load '$jobinput' using pigstorage(',') (time: long, value: chararray); f = filter d d <= thresh(); ...
the problem have leads me more general question, if possible transform input parameter in pig , use again kind of constant. there better way solve problem or approach needlessly complicated?
edit: tl;dr
after more searching found same problem: http://grokbase.com/t/pig/user/125gszzxnx/survey-where-are-all-the-udfs-and-macros
thanks gaurav recommending udfs in piggybank. seems there no performant solution without using declare , shell script.
you can put pig script python script , pass value.
#!/usr/bin/python import sys import time org.apache.pig.scripting import pig p = pig.compile("""d = load '$jobinput' using pigstorage(',') (time: long, value: chararray); f = filter d d <= '$thresh'; """) jobinput = {whatever defined} thresh = {whatever defined in udf} q = p.bind({'thresh':thresh,'jobinput':jobinput}) results = q.runsingle() if results.issuccessful() == "failed": raise "pig job failed"
Comments
Post a Comment