Skip to content

Commit cf8bf77

Browse files
authored
EVA-3570 Add jobs for recovering blocks through recovery agent for category ss and rs (#447)
add recovery jobs using monotonic accession recovery agent to recover blocks for category ss and rs
1 parent 948cf3e commit cf8bf77

File tree

39 files changed

+1285
-20
lines changed

39 files changed

+1285
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package uk.ac.ebi.eva.accession.clustering.batch.recovery;
2+
3+
import org.springframework.batch.core.JobExecution;
4+
import uk.ac.ebi.ampt2d.commons.accession.generators.monotonic.MonotonicAccessionRecoveryAgent;
5+
6+
import java.time.LocalDateTime;
7+
8+
public class RSAccessionRecoveryService {
9+
private final static String CATEGORY_ID = "rs";
10+
private MonotonicAccessionRecoveryAgent monotonicAccessionRecoveryAgent;
11+
private JobExecution jobExecution;
12+
private long recoveryCutOffDays;
13+
14+
public RSAccessionRecoveryService(MonotonicAccessionRecoveryAgent monotonicAccessionRecoveryAgent,
15+
long recoveryCutOffDays) {
16+
this.monotonicAccessionRecoveryAgent = monotonicAccessionRecoveryAgent;
17+
this.recoveryCutOffDays = recoveryCutOffDays;
18+
}
19+
20+
public void runRecoveryForCategoryRS() {
21+
LocalDateTime recoveryCutOffTime = LocalDateTime.now().minusDays(recoveryCutOffDays);
22+
monotonicAccessionRecoveryAgent.runRecovery(CATEGORY_ID, jobExecution.getJobId().toString(), recoveryCutOffTime);
23+
}
24+
25+
public void setJobExecution(JobExecution jobExecution) {
26+
this.jobExecution = jobExecution;
27+
}
28+
}

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/BeanNames.java

+10
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class BeanNames {
5555

5656
public static final String PROGRESS_LISTENER = "PROGRESS_LISTENER";
5757

58+
public static final String JOB_EXECUTION_LISTENER = "JOB_EXECUTION_LISTENER";
59+
5860
public static final String ACCESSIONING_SHUTDOWN_STEP = "ACCESSIONING_SHUTDOWN_STEP";
5961

6062
public static final String CLUSTERING_FROM_VCF_STEP = "CLUSTERING_FROM_VCF_STEP";
@@ -95,4 +97,12 @@ public class BeanNames {
9597
public static final String CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER = "CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER";
9698

9799
public static final String RS_SPLIT_WRITER_JOB_EXECUTION_SETTER = "RS_SPLIT_WRITER_JOB_EXECUTION_SETTER";
100+
101+
public static final String RS_ACCESSION_RECOVERY_SERVICE = "RS_ACCESSION_RECOVERY_SERVICE";
102+
103+
public static final String RS_ACCESSION_RECOVERY_STEP = "RS_ACCESSION_RECOVERY_STEP";
104+
105+
public static final String RS_ACCESSION_RECOVERY_JOB = "RS_ACCESSION_RECOVERY_JOB";
106+
107+
public static final String RS_ACCESSION_RECOVERY_JOB_LISTENER = "RS_ACCESSION_RECOVERY_JOB_LISTENER";
98108
}

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/batch/jobs/ClusterUnclusteredVariantsJobConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;
1717

1818
import org.springframework.batch.core.Job;
19+
import org.springframework.batch.core.JobExecutionListener;
1920
import org.springframework.batch.core.Step;
2021
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
2122
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -28,6 +29,7 @@
2829
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
2930
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
3031
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTER_UNCLUSTERED_VARIANTS_JOB;
32+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
3133
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
3234
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;
3335

@@ -44,6 +46,7 @@ public Job clusteringFromMongoJob(
4446
@Qualifier(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP) Step clearRSMergeAndSplitCandidatesStep,
4547
@Qualifier(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP) Step clusteringNonClusteredVariantsFromMongoStep,
4648
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
49+
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
4750
JobBuilderFactory jobBuilderFactory) {
4851
return jobBuilderFactory.get(CLUSTER_UNCLUSTERED_VARIANTS_JOB)
4952
.incrementer(new RunIdIncrementer())
@@ -52,6 +55,7 @@ public Job clusteringFromMongoJob(
5255
.next(clearRSMergeAndSplitCandidatesStep)
5356
.next(clusteringNonClusteredVariantsFromMongoStep)
5457
.next(accessioningShutdownStep)
58+
.listener(jobExecutionListener)
5559
.build();
5660
}
5761
}

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/batch/jobs/ClusteringFromMongoJobConfiguration.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import htsjdk.samtools.util.StringUtil;
1919
import org.springframework.batch.core.Job;
2020
import org.springframework.batch.core.JobExecution;
21+
import org.springframework.batch.core.JobExecutionListener;
2122
import org.springframework.batch.core.Step;
2223
import org.springframework.batch.core.StepContribution;
2324
import org.springframework.batch.core.StepExecution;
@@ -47,6 +48,7 @@
4748
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
4849
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_FROM_MONGO_JOB;
4950
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
51+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
5052
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
5153
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;
5254

@@ -86,6 +88,7 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
8688
// Back-propagate RS in the remapped assembly that were split or merged
8789
@Qualifier(BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP)
8890
Step backPropagateSplitMergedRSStep,
91+
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
8992
StepBuilderFactory stepBuilderFactory,
9093
JobBuilderFactory jobBuilderFactory,
9194
InputParameters inputParameters) {
@@ -95,6 +98,7 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
9598
.incrementer(new RunIdIncrementer())
9699
//We need the dummy step here because Spring won't conditionally start the first step
97100
.start(dummyStep)
101+
.listener(jobExecutionListener)
98102
.next(jobExecutionDecider)
99103
.on("TRUE")
100104
.to(new FlowBuilder<SimpleFlow>("remappedAssemblyClusteringFlow")
@@ -105,7 +109,8 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
105109
.next(clusteringNonClusteredVariantsFromMongoStep)
106110
.next(accessioningShutdownStep)
107111
.next(backPropagateNewRSStep)
108-
.next(backPropagateSplitMergedRSStep).build())
112+
.next(backPropagateSplitMergedRSStep)
113+
.build())
109114
.on("*").end()
110115
.from(jobExecutionDecider)
111116
.on("FALSE")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;
2+
3+
import org.springframework.batch.core.Job;
4+
import org.springframework.batch.core.JobExecutionListener;
5+
import org.springframework.batch.core.Step;
6+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
7+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
8+
import org.springframework.batch.core.launch.support.RunIdIncrementer;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.beans.factory.annotation.Qualifier;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
14+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB;
15+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB_LISTENER;
16+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_STEP;
17+
18+
@Configuration
19+
@EnableBatchProcessing
20+
public class RSAccessionRecoveryJobConfiguration {
21+
22+
@Autowired
23+
@Qualifier(RS_ACCESSION_RECOVERY_STEP)
24+
private Step monotonicAccessionRecoveryAgentCategoryRSStep;
25+
26+
@Autowired
27+
@Qualifier(RS_ACCESSION_RECOVERY_JOB_LISTENER)
28+
private JobExecutionListener monotonicAccessionRecoveryAgentCategoryRSJobListener;
29+
30+
@Bean(RS_ACCESSION_RECOVERY_JOB)
31+
public Job createMonotonicAccessionRecoveryAgentCategoryRSJob(JobBuilderFactory jobBuilderFactory) {
32+
return jobBuilderFactory.get(RS_ACCESSION_RECOVERY_JOB)
33+
.incrementer(new RunIdIncrementer())
34+
.start(monotonicAccessionRecoveryAgentCategoryRSStep)
35+
.listener(monotonicAccessionRecoveryAgentCategoryRSJobListener)
36+
.build();
37+
}
38+
39+
}

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/batch/jobs/StudyClusteringJobConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;
1717

1818
import org.springframework.batch.core.Job;
19+
import org.springframework.batch.core.JobExecutionListener;
1920
import org.springframework.batch.core.Step;
2021
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
2122
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -25,6 +26,7 @@
2526
import org.springframework.context.annotation.Configuration;
2627

2728
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
29+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
2830
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_JOB;
2931
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_STEP;
3032

@@ -35,11 +37,13 @@ public class StudyClusteringJobConfiguration {
3537
@Bean(STUDY_CLUSTERING_JOB)
3638
public Job studyClusteringJob(@Qualifier(STUDY_CLUSTERING_STEP) Step clusteringStep,
3739
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
40+
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
3841
JobBuilderFactory jobBuilderFactory) {
3942
return jobBuilderFactory.get(STUDY_CLUSTERING_JOB)
4043
.incrementer(new RunIdIncrementer())
4144
.start(clusteringStep)
4245
.next(accessioningShutdownStep)
46+
.listener(jobExecutionListener)
4347
.build();
4448
}
4549
}

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/listeners/JobExecutionSetter.java eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/batch/listeners/JobExecutionSetterConfiguration.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package uk.ac.ebi.eva.accession.clustering.batch.listeners;
1+
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;
22

33
import org.springframework.batch.core.ExitStatus;
44
import org.springframework.batch.core.StepExecution;
@@ -17,7 +17,7 @@
1717
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_SPLIT_WRITER_JOB_EXECUTION_SETTER;
1818

1919
@Configuration
20-
public class JobExecutionSetter {
20+
public class JobExecutionSetterConfiguration {
2121
@Bean(NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER)
2222
public StepExecutionListener getNonClusteredClusteringWriterJobExecutionSetter(
2323
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER) ClusteringWriter nonClusteredClusteringWriter) {

eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/configuration/batch/listeners/ListenersConfiguration.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;
22

3+
import org.springframework.batch.core.JobExecution;
4+
import org.springframework.batch.core.JobExecutionListener;
35
import org.springframework.beans.factory.annotation.Qualifier;
46
import org.springframework.context.annotation.Bean;
57
import org.springframework.context.annotation.Configuration;
@@ -8,16 +10,18 @@
810
import uk.ac.ebi.eva.accession.clustering.batch.listeners.ClusteringProgressListener;
911
import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetricCompute;
1012
import uk.ac.ebi.eva.accession.clustering.parameters.InputParameters;
13+
import uk.ac.ebi.eva.accession.core.service.nonhuman.ClusteredVariantAccessioningService;
14+
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;
1115
import uk.ac.ebi.eva.metrics.configuration.MetricConfiguration;
1216
import uk.ac.ebi.eva.metrics.count.CountServiceParameters;
1317
import uk.ac.ebi.eva.metrics.metric.MetricCompute;
1418

19+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
1520
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROGRESS_LISTENER;
1621

1722
@Configuration
1823
@Import({MetricConfiguration.class})
1924
public class ListenersConfiguration {
20-
2125
@Bean(PROGRESS_LISTENER)
2226
public ClusteringProgressListener clusteringProgressListener(InputParameters parameters, MetricCompute metricCompute) {
2327
return new ClusteringProgressListener(parameters, metricCompute);
@@ -28,6 +32,21 @@ public MetricCompute getClusteringMetricCompute(CountServiceParameters countServ
2832
@Qualifier("COUNT_STATS_REST_TEMPLATE") RestTemplate restTemplate,
2933
InputParameters inputParameters) {
3034
return new ClusteringMetricCompute(countServiceParameters, restTemplate, inputParameters.getAssemblyAccession(),
31-
inputParameters.getProjects());
35+
inputParameters.getProjects());
36+
}
37+
38+
@Bean(JOB_EXECUTION_LISTENER)
39+
public JobExecutionListener jobExecutionListener(SubmittedVariantAccessioningService submittedVariantAccessioningService,
40+
ClusteredVariantAccessioningService clusteredVariantAccessioningService) {
41+
return new JobExecutionListener() {
42+
@Override
43+
public void beforeJob(JobExecution jobExecution) {}
44+
45+
@Override
46+
public void afterJob(JobExecution jobExecution) {
47+
submittedVariantAccessioningService.shutDownAccessionGenerator();
48+
clusteredVariantAccessioningService.shutDownAccessionGenerator();
49+
}
50+
};
3251
}
3352
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;
2+
3+
import org.springframework.batch.core.JobExecution;
4+
import org.springframework.batch.core.JobExecutionListener;
5+
import org.springframework.beans.factory.annotation.Qualifier;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;
9+
10+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB_LISTENER;
11+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;
12+
13+
@Configuration
14+
public class RSAccessionRecoveryJobListenerConfiguration {
15+
@Bean(RS_ACCESSION_RECOVERY_JOB_LISTENER)
16+
public JobExecutionListener jobExecutionListener(@Qualifier(RS_ACCESSION_RECOVERY_SERVICE)
17+
RSAccessionRecoveryService RSAccessionRecoveryService) {
18+
return new JobExecutionListener() {
19+
@Override
20+
public void beforeJob(JobExecution jobExecution) {
21+
RSAccessionRecoveryService.setJobExecution(jobExecution);
22+
}
23+
24+
@Override
25+
public void afterJob(JobExecution jobExecution) {
26+
}
27+
};
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package uk.ac.ebi.eva.accession.clustering.configuration.batch.recovery;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.beans.factory.annotation.Value;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import uk.ac.ebi.ampt2d.commons.accession.generators.monotonic.MonotonicAccessionRecoveryAgent;
8+
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
9+
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;
10+
import uk.ac.ebi.eva.accession.core.service.nonhuman.eva.ClusteredVariantAccessioningDatabaseService;
11+
12+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;
13+
14+
@Configuration
15+
public class RSAccessionRecoveryServiceConfiguration {
16+
@Autowired
17+
private ContiguousIdBlockService blockService;
18+
@Autowired
19+
private ClusteredVariantAccessioningDatabaseService clusteredVariantAccessioningDatabaseService;
20+
21+
@Value("${recovery.cutoff.days}")
22+
private long recoveryCutOffDays;
23+
24+
@Bean(RS_ACCESSION_RECOVERY_SERVICE)
25+
public RSAccessionRecoveryService getMonotonicAccessionRecoveryAgentCategoryRSService() {
26+
return new RSAccessionRecoveryService(getMonotonicAccessionRecoveryAgent(), recoveryCutOffDays);
27+
}
28+
29+
private MonotonicAccessionRecoveryAgent getMonotonicAccessionRecoveryAgent() {
30+
return new MonotonicAccessionRecoveryAgent(blockService, clusteredVariantAccessioningDatabaseService);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package uk.ac.ebi.eva.accession.clustering.configuration.batch.steps;
2+
3+
import org.springframework.batch.core.Step;
4+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
5+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.beans.factory.annotation.Qualifier;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;
11+
12+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;
13+
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_STEP;
14+
15+
@Configuration
16+
@EnableBatchProcessing
17+
public class RSAccessionRecoveryStepConfiguration {
18+
@Autowired
19+
@Qualifier(RS_ACCESSION_RECOVERY_SERVICE)
20+
private RSAccessionRecoveryService RSAccessionRecoveryService;
21+
22+
@Bean(RS_ACCESSION_RECOVERY_STEP)
23+
public Step monotonicAccessionRecoveryAgentCategoryRSStep(StepBuilderFactory stepBuilderFactory) {
24+
return stepBuilderFactory.get(RS_ACCESSION_RECOVERY_STEP)
25+
.tasklet((contribution, chunkContext) -> {
26+
RSAccessionRecoveryService.runRecoveryForCategoryRS();
27+
return null;
28+
})
29+
.build();
30+
}
31+
}

0 commit comments

Comments
 (0)