[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[smila-user] Push records from Filesystem Crawler to a running Job

Hi,

I'm trying to crawl a directory on file system (using the provided File Crawler Job), containing 29 video files, in order to process them in a further pipeline.
The pipeline called FrameSegmentationPipeline is contained within a workflow called FrameSegmentationWorkflow, and the related Job is called FrameSegmentationJob.
Here the jsons:

workflows.json

{
  "workflows":[

...

    {
       "name":"FrameSegmentationWorkflow",
       "modes":[
        "standard"
      ],
       "parameters":{
        "pipelineRunBulkSize":"20"
       },
       "startAction":{
          "worker":"bulkbuilder",
          "output":{
            "insertedRecords":"importBucket"
          }
       },
       "actions":[
          {
             "worker":"pipelineProcessor",
             "parameters":
              {
                  "pipelineName": "FrameSegmentationPipeline"
              },
             "input":{
                "input":"importBucket"
             }
          }
       ]
    },

...

    {
      "name":"fileCrawling",
      "modes":[
        "runOnce"
      ],
      "startAction":{
        "worker":"fileCrawler",
        "input":{
          "directoriesToCrawl":"dirsToCrawlBucket"
        },
        "output":{
          "directoriesToCrawl":"dirsToCrawlBucket",
          "filesToCrawl":"filesToCrawlBucket"
        }
      },
      "actions":[
        {
          "worker":"deltaChecker",
          "input":{
            "recordsToCheck":"filesToCrawlBucket"
          },
          "output":{
            "updatedRecords":"filesToFetchBucket"
          }
        },
        {
          "worker":"fileFetcher",
          "input":{
            "filesToFetch":"filesToFetchBucket"
          },
          "output":{
            "files":"filesToPushBucket"
          }
        },
        {
          "worker":"updatePusher",
          "input":{
            "recordsToPush":"filesToPushBucket"
          }
        }
      ]
    },
  
...

  ]
}

jobs.json

{
  "jobs":[
     ...
    {
      "name":"FrameSegmentationJob",
      "parameters":{
        "tempStore":"temp"
      },
      "workflow":"FrameSegmentationWorkflow"
    },

  ...

    {
      "name":"crawlFilesystem",
      "workflow":"fileCrawling",
      "parameters":{
        "tempStore":"temp",
        "dataSource":"file",
        "rootFolder":"D:/videos",
        "jobToPushTo":"FrameSegmentationJob"
      }
    },
   
   ...

  ]
}

I start the FrameSegmentationJob jobthrough REST API call first, then the FilesystemCrawler job.

I obtain such SMILA.log:

...
 2012-03-07 00:46:23,709 INFO  [Component Resolve Thread (Bundle 127)        ]  internal.HttpServiceImpl                      - HTTP server started successfully on port 8080.
 2012-03-07 00:46:25,644 INFO  [qtp2144980928-58                             ]  internal.JobRunEngineImpl                     - start called for job 'FrameSegmentationJob', jobRunMode 'null'
 2012-03-07 00:46:26,592 INFO  [qtp2144980928-58                             ]  zk.RunStorageZk                               - Changing job state for job run '20120307-004625679706' for job 'FrameSegmentationJob' to state RUNNING while expecting state PREPARING returned result: true
 2012-03-07 00:46:26,592 INFO  [qtp2144980928-58                             ]  internal.JobRunEngineImpl                     - started job run '20120307-004625679706' for job 'FrameSegmentationJob'
 2012-03-07 00:46:26,613 INFO  [qtp2144980928-53                             ]  internal.JobRunEngineImpl                     - start called for job 'crawlFilesystem', jobRunMode 'null'
 2012-03-07 00:46:27,341 INFO  [qtp2144980928-53                             ]  zk.RunStorageZk                               - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state RUNNING while expecting state PREPARING returned result: true
 2012-03-07 00:46:27,541 INFO  [qtp2144980928-53                             ]  internal.JobRunEngineImpl                     - finish called for job 'crawlFilesystem', run '20120307-004626617901'
 2012-03-07 00:46:27,559 INFO  [qtp2144980928-53                             ]  helper.BulkbuilderTaskProvider                - Could not find task to be finished for job 'crawlFilesystem'.
 2012-03-07 00:46:27,999 INFO  [qtp2144980928-53                             ]  internal.JobRunEngineImpl                     - started job run '20120307-004626617901' for job 'crawlFilesystem'
 2012-03-07 00:46:28,303 INFO  [pool-5-thread-1                              ]  file.FileCrawlerWorker                        - directory D:\videos contained 29 files and 0 directories.
 2012-03-07 00:46:30,804 INFO  [pool-5-thread-1                              ]  zk.RunStorageZk                               - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state COMPLETING while expecting state FINISHING returned result: true
 2012-03-07 00:46:30,854 INFO  [pool-5-thread-1                              ]  zk.RunStorageZk                               - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state CLEANINGUP while expecting state COMPLETING returned result: true
 2012-03-07 00:46:30,854 INFO  [pool-5-thread-1                              ]  internal.JobRunEngineImpl                     - Cleaning up job run '20120307-004626617901' for job 'crawlFilesystem' with final state SUCCEEDED
 2012-03-07 00:46:30,890 INFO  [pool-5-thread-1                              ]  objectstore.PermanentStorageObjectstore       - Job run data of run '20120307-004626617901' for job 'crawlFilesystem': {
  "endTime" : "2012-03-07T00:46:30.872+0100",
  "finishTime" : "2012-03-07T00:46:27.583+0100",
  "jobId" : "20120307-004626617901",
  "mode" : "RUNONCE",
  "startTime" : "2012-03-07T00:46:27.116+0100",
  "state" : "SUCCEEDED",
  "workflowRuns" : {
    "activeWorkflowRunCount" : 0,
    "canceledWorkflowRunCount" : 0,
    "failedWorkflowRunCount" : 0,
    "startedWorkflowRunCount" : 1,
    "successfulWorkflowRunCount" : 1
  },
  "tasks" : {
    "canceledTaskCount" : 0,
    "createdTaskCount" : 2,
    "failedAfterRetryTaskCount" : 0,
    "failedWithoutRetryTaskCount" : 0,
    "obsoleteTaskCount" : 0,
    "retriedAfterErrorTaskCount" : 0,
    "retriedAfterTimeoutTaskCount" : 0,
    "successfulTaskCount" : 2
  },
  "worker" : {
    "deltaChecker" : {
      "warnCount" : 0,
      "duration" : 0.235910187,
      "duration.iodata" : 7.79314E-4,
      "duration.iodata.close" : 2.041E-6,
      "duration.iodata.open" : 7.77273E-4,
      "duration.perform" : 0.235113728,
      "duration.perform.function.checkDeltaState" : 0.19739352200000002,
      "duration.perform.input" : 0.0321409,
      "duration.perform.input.recordsToCheck" : 0.0321409,
      "endTime" : "2012-03-07T00:46:29.785+0100",
      "input.recordsToCheck.dataObjectCount" : 1,
      "input.recordsToCheck.recordCount" : 29,
      "input.recordsToCheck.size" : 7377,
      "output.updatedRecords.dataObjectCount" : 1,
      "output.updatedRecords.recordCount" : 0,
      "output.updatedRecords.size" : 0,
      "startTime" : "2012-03-07T00:46:29.266+0100",
      "successfulTaskCount" : 1
    },
    "fileCrawler" : {
      "warnCount" : 0,
      "duration" : 0.085947071,
      "duration.iodata" : 0.038523606,
      "duration.iodata.close" : 0.035147121,
      "duration.iodata.open" : 0.003376485,
      "duration.perform" : 0.047067487,
      "duration.perform.output" : 0.023699489,
      "duration.perform.output.filesToCrawl" : 0.023699489,
      "endTime" : "2012-03-07T00:46:28.415+0100",
      "output.filesToCrawl.dataObjectCount" : 1,
      "output.filesToCrawl.recordCount" : 29,
      "output.filesToCrawl.size" : 7377,
      "startTime" : "2012-03-07T00:46:28.125+0100",
      "successfulTaskCount" : 1
    }
  },
  "jobDefinition" : {
    "name" : "crawlFilesystem",
    "readOnly" : true,
    "parameters" : {
      "tempStore" : "temp",
      "dataSource" : "file",
      "rootFolder" : "D:/videos",
      "jobToPushTo" : "FrameSegmentationJob"
    },
    "workflow" : "fileCrawling"
  },
  "workflowDefinition" : {
    "name" : "fileCrawling",
    "readOnly" : true,
    "modes" : [ "runOnce" ],
    "startAction" : {
      "worker" : "fileCrawler",
      "input" : {
        "directoriesToCrawl" : "dirsToCrawlBucket"
      },
      "output" : {
        "directoriesToCrawl" : "dirsToCrawlBucket",
        "filesToCrawl" : "filesToCrawlBucket"
      }
    },
    "actions" : [ {
      "worker" : "deltaChecker",
      "input" : {
        "recordsToCheck" : "filesToCrawlBucket"
      },
      "output" : {
        "updatedRecords" : "filesToFetchBucket"
      }
    }, {
      "worker" : "fileFetcher",
      "input" : {
        "filesToFetch" : "filesToFetchBucket"
      },
      "output" : {
        "files" : "filesToPushBucket"
      }
    }, {
      "worker" : "updatePusher",
      "input" : {
        "recordsToPush" : "filesToPushBucket"
      }
    } ]
  }
}
 2012-03-07 00:46:31,812 INFO  [pool-5-thread-1                              ]  zookeeper.ZkConnection                        - ZkConnection operation time: 240 ms, tries: 1, operation: deleteNode '/smila/jobmanager/jobs/crawlFilesystem/data/startTime'



It seems that the the files were crawled but then are not pushed correctly to the FrameSegmentation job, thus they never reach the pipeline. I checked with JConsole and I found that the Pipeline is never invocated.

Am I missing any configuration? Or am I simply doing something wrong?

Thank you,

Nicolò Aquilini