[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[smila-user] Push records from Filesystem Crawler to a running Job
|
Hi,
I'm trying to crawl a directory on file system (using the provided File Crawler Job), containing 29 video files, in order to process them in a further pipeline.
The pipeline called FrameSegmentationPipeline is contained within a workflow called FrameSegmentationWorkflow, and the related Job is called FrameSegmentationJob.
Here the jsons:
workflows.json
{
"workflows":[
...
{
"name":"FrameSegmentationWorkflow",
"modes":[
"standard"
],
"parameters":{
"pipelineRunBulkSize":"20"
},
"startAction":{
"worker":"bulkbuilder",
"output":{
"insertedRecords":"importBucket"
}
},
"actions":[
{
"worker":"pipelineProcessor",
"parameters":
{
"pipelineName": "FrameSegmentationPipeline"
},
"input":{
"input":"importBucket"
}
}
]
},
...
{
"name":"fileCrawling",
"modes":[
"runOnce"
],
"startAction":{
"worker":"fileCrawler",
"input":{
"directoriesToCrawl":"dirsToCrawlBucket"
},
"output":{
"directoriesToCrawl":"dirsToCrawlBucket",
"filesToCrawl":"filesToCrawlBucket"
}
},
"actions":[
{
"worker":"deltaChecker",
"input":{
"recordsToCheck":"filesToCrawlBucket"
},
"output":{
"updatedRecords":"filesToFetchBucket"
}
},
{
"worker":"fileFetcher",
"input":{
"filesToFetch":"filesToFetchBucket"
},
"output":{
"files":"filesToPushBucket"
}
},
{
"worker":"updatePusher",
"input":{
"recordsToPush":"filesToPushBucket"
}
}
]
},
...
]
}
jobs.json{
"jobs":[
...
{
"name":"FrameSegmentationJob",
"parameters":{
"tempStore":"temp"
},
"workflow":"FrameSegmentationWorkflow"
},
...
{
"name":"crawlFilesystem",
"workflow":"fileCrawling",
"parameters":{
"tempStore":"temp",
"dataSource":"file",
"rootFolder":"D:/videos",
"jobToPushTo":"FrameSegmentationJob"
}
},
...
]
}
I start the FrameSegmentationJob jobthrough REST API call first, then the FilesystemCrawler job.
I obtain such SMILA.log:
...
2012-03-07 00:46:23,709 INFO [Component Resolve Thread (Bundle 127) ] internal.HttpServiceImpl - HTTP server started successfully on port 8080.
2012-03-07 00:46:25,644 INFO [qtp2144980928-58 ] internal.JobRunEngineImpl - start called for job 'FrameSegmentationJob', jobRunMode 'null'
2012-03-07 00:46:26,592 INFO [qtp2144980928-58 ] zk.RunStorageZk - Changing job state for job run '20120307-004625679706' for job 'FrameSegmentationJob' to state RUNNING while expecting state PREPARING returned result: true
2012-03-07 00:46:26,592 INFO [qtp2144980928-58 ] internal.JobRunEngineImpl - started job run '20120307-004625679706' for job 'FrameSegmentationJob'
2012-03-07 00:46:26,613 INFO [qtp2144980928-53 ] internal.JobRunEngineImpl - start called for job 'crawlFilesystem', jobRunMode 'null'
2012-03-07 00:46:27,341 INFO [qtp2144980928-53 ] zk.RunStorageZk - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state RUNNING while expecting state PREPARING returned result: true
2012-03-07 00:46:27,541 INFO [qtp2144980928-53 ] internal.JobRunEngineImpl - finish called for job 'crawlFilesystem', run '20120307-004626617901'
2012-03-07 00:46:27,559 INFO [qtp2144980928-53 ] helper.BulkbuilderTaskProvider - Could not find task to be finished for job 'crawlFilesystem'.
2012-03-07 00:46:27,999 INFO [qtp2144980928-53 ] internal.JobRunEngineImpl - started job run '20120307-004626617901' for job 'crawlFilesystem'
2012-03-07 00:46:28,303 INFO [pool-5-thread-1 ] file.FileCrawlerWorker - directory D:\videos contained 29 files and 0 directories.
2012-03-07 00:46:30,804 INFO [pool-5-thread-1 ] zk.RunStorageZk - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state COMPLETING while expecting state FINISHING returned result: true
2012-03-07 00:46:30,854 INFO [pool-5-thread-1 ] zk.RunStorageZk - Changing job state for job run '20120307-004626617901' for job 'crawlFilesystem' to state CLEANINGUP while expecting state COMPLETING returned result: true
2012-03-07 00:46:30,854 INFO [pool-5-thread-1 ] internal.JobRunEngineImpl - Cleaning up job run '20120307-004626617901' for job 'crawlFilesystem' with final state SUCCEEDED
2012-03-07 00:46:30,890 INFO [pool-5-thread-1 ] objectstore.PermanentStorageObjectstore - Job run data of run '20120307-004626617901' for job 'crawlFilesystem': {
"endTime" : "2012-03-07T00:46:30.872+0100",
"finishTime" : "2012-03-07T00:46:27.583+0100",
"jobId" : "20120307-004626617901",
"mode" : "RUNONCE",
"startTime" : "2012-03-07T00:46:27.116+0100",
"state" : "SUCCEEDED",
"workflowRuns" : {
"activeWorkflowRunCount" : 0,
"canceledWorkflowRunCount" : 0,
"failedWorkflowRunCount" : 0,
"startedWorkflowRunCount" : 1,
"successfulWorkflowRunCount" : 1
},
"tasks" : {
"canceledTaskCount" : 0,
"createdTaskCount" : 2,
"failedAfterRetryTaskCount" : 0,
"failedWithoutRetryTaskCount" : 0,
"obsoleteTaskCount" : 0,
"retriedAfterErrorTaskCount" : 0,
"retriedAfterTimeoutTaskCount" : 0,
"successfulTaskCount" : 2
},
"worker" : {
"deltaChecker" : {
"warnCount" : 0,
"duration" : 0.235910187,
"duration.iodata" : 7.79314E-4,
"duration.iodata.close" : 2.041E-6,
"duration.iodata.open" : 7.77273E-4,
"duration.perform" : 0.235113728,
"duration.perform.function.checkDeltaState" : 0.19739352200000002,
"duration.perform.input" : 0.0321409,
"duration.perform.input.recordsToCheck" : 0.0321409,
"endTime" : "2012-03-07T00:46:29.785+0100",
"input.recordsToCheck.dataObjectCount" : 1,
"input.recordsToCheck.recordCount" : 29,
"input.recordsToCheck.size" : 7377,
"output.updatedRecords.dataObjectCount" : 1,
"output.updatedRecords.recordCount" : 0,
"output.updatedRecords.size" : 0,
"startTime" : "2012-03-07T00:46:29.266+0100",
"successfulTaskCount" : 1
},
"fileCrawler" : {
"warnCount" : 0,
"duration" : 0.085947071,
"duration.iodata" : 0.038523606,
"duration.iodata.close" : 0.035147121,
"duration.iodata.open" : 0.003376485,
"duration.perform" : 0.047067487,
"duration.perform.output" : 0.023699489,
"duration.perform.output.filesToCrawl" : 0.023699489,
"endTime" : "2012-03-07T00:46:28.415+0100",
"output.filesToCrawl.dataObjectCount" : 1,
"output.filesToCrawl.recordCount" : 29,
"output.filesToCrawl.size" : 7377,
"startTime" : "2012-03-07T00:46:28.125+0100",
"successfulTaskCount" : 1
}
},
"jobDefinition" : {
"name" : "crawlFilesystem",
"readOnly" : true,
"parameters" : {
"tempStore" : "temp",
"dataSource" : "file",
"rootFolder" : "D:/videos",
"jobToPushTo" : "FrameSegmentationJob"
},
"workflow" : "fileCrawling"
},
"workflowDefinition" : {
"name" : "fileCrawling",
"readOnly" : true,
"modes" : [ "runOnce" ],
"startAction" : {
"worker" : "fileCrawler",
"input" : {
"directoriesToCrawl" : "dirsToCrawlBucket"
},
"output" : {
"directoriesToCrawl" : "dirsToCrawlBucket",
"filesToCrawl" : "filesToCrawlBucket"
}
},
"actions" : [ {
"worker" : "deltaChecker",
"input" : {
"recordsToCheck" : "filesToCrawlBucket"
},
"output" : {
"updatedRecords" : "filesToFetchBucket"
}
}, {
"worker" : "fileFetcher",
"input" : {
"filesToFetch" : "filesToFetchBucket"
},
"output" : {
"files" : "filesToPushBucket"
}
}, {
"worker" : "updatePusher",
"input" : {
"recordsToPush" : "filesToPushBucket"
}
} ]
}
}
2012-03-07 00:46:31,812 INFO [pool-5-thread-1 ] zookeeper.ZkConnection - ZkConnection operation time: 240 ms, tries: 1, operation: deleteNode '/smila/jobmanager/jobs/crawlFilesystem/data/startTime'
It seems that the the files were crawled but then are not pushed correctly to the FrameSegmentation job, thus they never reach the pipeline. I checked with JConsole and I found that the Pipeline is never invocated.
Am I missing any configuration? Or am I simply doing something wrong?
Thank you,
Nicolò Aquilini