Основные задачи перед ETL(extract/transform/load):
В luigi запуск пайплайна по расписанию поддерживается только с использованием cron.
my_tasks.py:
class DataDump(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self): return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/var/log/dump/%Y-%m-%d.txt'))
class AggregationTask(luigi.Task):
date = luigi.DateParameter()
window = luigi.IntParameter()
def requires(self): return [DataDump(self.date - datetime.timedelta(i)) for i in range(self.window)]
def run(self): run_some_cool_stuff(self.input())
def output(self): return luigi.contrib.hdfs.HdfsTarget('/aggregated-%s-%d' % (self.date, self.window))
class RunAll(luigi.Task):
''' Dummy task that triggers execution of a other tasks'''
def requires(self):
for window in [3, 7, 14]:
for d in range(10): # guarantee that aggregations were run for the past 10 days
yield AggregationTask(datetime.date.today() - datetime.timedelta(d), window)
Дальше нужно запускать код через cron. В настройках cron можно указать:
30 0 * * * my-user luigi RunAll --module my_tasks
luigi --module all_reports RangeDaily --of AllReportsV2 --start 2014-10-31 --stop 2014-12-25
class A(luigi.Task):
# set maximum resources a priori
resources = {"some_resource": 3}
def run(self):
# do something
...
# decrease consumption of "some_resource" by one
self.decrease_running_resources({"some_resource": 1})
# continue with reduced resources
...
luigi.notifications