Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.

I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads

The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.

The problem is that if any of the download tasks fails then the successful download list is never created.

This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.

Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?

Example code below, GetFiles is the task that we are calling from the command line.

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
215 views
Welcome To Ask or Share your Answers For Others

1 Answer

Several years later, you must have found the answer, but here is something that can help.

class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()

      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.

          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)

      def complete(self, ignore=False):
          return ignore

      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...