Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job queue status to describe API #2464

Merged
merged 5 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions docs/management_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ curl http://localhost:8081/models/noop
"gpu": false,
"memoryUsage": 89247744
}
]
],
"jobQueueStatus": {
"remainingCapacity": 100,
"pendingRequests": 0,
"consecutiveFailedRequests": 0
}
}
]
```
Expand Down Expand Up @@ -234,7 +239,12 @@ curl http://localhost:8081/models/noop/2.0
"gpu": false,
"memoryUsage": 89247744
}
]
],
"jobQueueStatus": {
"remainingCapacity": 100,
"pendingRequests": 0,
"consecutiveFailedRequests": 0
}
}
]
```
Expand Down Expand Up @@ -264,7 +274,12 @@ curl http://localhost:8081/models/noop/all
"gpu": false,
"memoryUsage": 89247744
}
]
],
"jobQueueStatus": {
"remainingCapacity": 100,
"pendingRequests": 0,
"consecutiveFailedRequests": 0
}
},
{
"modelName": "noop",
Expand All @@ -284,7 +299,12 @@ curl http://localhost:8081/models/noop/all
"gpu": false,
"memoryUsage": 89247744
}
]
],
"jobQueueStatus": {
"remainingCapacity": 100,
"pendingRequests": 0,
"consecutiveFailedRequests": 0
}
}
]
```
Expand Down Expand Up @@ -401,6 +421,11 @@ curl http://localhost:8081/models/noop-customized/1.0?customized=true
"gpuUsage": "N/A"
}
],
"jobQueueStatus": {
"remainingCapacity": 100,
"pendingRequests": 0,
"consecutiveFailedRequests": 0
},
"customizedMetadata": "{\n \"data1\": \"1\",\n \"data2\": \"2\"\n}"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class DescribeModelResponse {
private boolean loadedAtStartup;

private List<Worker> workers;
private Metrics metrics;
private JobQueueStatus jobQueueStatus;
private String customizedMetadata;

public DescribeModelResponse() {
Expand Down Expand Up @@ -142,12 +142,12 @@ public void addWorker(
workers.add(worker);
}

public Metrics getMetrics() {
return metrics;
public JobQueueStatus getJobQueueStatus() {
return jobQueueStatus;
}

public void setMetrics(Metrics metrics) {
this.metrics = metrics;
public void setJobQueueStatus(JobQueueStatus jobQueueStatus) {
this.jobQueueStatus = jobQueueStatus;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TS provide SDK to allow cx to customize the endpoint response. It will break cx's customized plugin when class Metrics and func getMetrics/etMetrics are moved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated implementation to retain the Metrics class definition as is.

}

public void setCustomizedMetadata(byte[] customizedMetadata) {
Expand Down Expand Up @@ -227,34 +227,34 @@ public void setMemoryUsage(long memoryUsage) {
}
}

public static final class Metrics {
public static final class JobQueueStatus {

private int rejectedRequests;
private int waitingQueueSize;
private int requests;
private int remainingCapacity;
private int pendingRequests;
private int consecutiveFailedRequests;

public int getRejectedRequests() {
return rejectedRequests;
public int getRemainingCapacity() {
return remainingCapacity;
}

public void setRejectedRequests(int rejectedRequests) {
this.rejectedRequests = rejectedRequests;
public void setRemainingCapacity(int remainingCapacity) {
this.remainingCapacity = remainingCapacity;
}

public int getWaitingQueueSize() {
return waitingQueueSize;
public int getPendingRequests() {
return pendingRequests;
}

public void setWaitingQueueSize(int waitingQueueSize) {
this.waitingQueueSize = waitingQueueSize;
public void setPendingRequests(int pendingRequests) {
this.pendingRequests = pendingRequests;
}

public int getRequests() {
return requests;
public int getConsecutiveFailedRequests() {
return consecutiveFailedRequests;
}

public void setRequests(int requests) {
this.requests = requests;
public void setConsecutiveFailedRequests(int consecutiveFailedRequests) {
this.consecutiveFailedRequests = consecutiveFailedRequests;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,20 +435,20 @@ private static Operation getDescribeModelOperation(boolean version) {
workers.setItems(worker);

schema.addProperty("workers", workers, true);
Schema metrics = new Schema("object");
metrics.addProperty(
"rejectedRequests",
new Schema("integer", "Number requests has been rejected in last 10 minutes."),
Schema jobQueueStatus = new Schema("object");
jobQueueStatus.addProperty(
"remainingCapacity",
new Schema("integer", "Number of new requests that can be queued."),
true);
metrics.addProperty(
"waitingQueueSize",
new Schema("integer", "Number requests waiting in the queue."),
jobQueueStatus.addProperty(
"pendingRequests",
new Schema("integer", "Number of requests waiting in the queue."),
true);
metrics.addProperty(
"requests",
new Schema("integer", "Number requests processed in last 10 minutes."),
jobQueueStatus.addProperty(
"consecutiveFailedRequests",
new Schema("integer", "Number of consecutive failed requests."),
true);
schema.addProperty("metrics", metrics, true);
schema.addProperty("jobQueueStatus", jobQueueStatus, true);

MediaType mediaType = new MediaType(HttpHeaderValues.APPLICATION_JSON.toString(), schema);
MediaType error = getErrorResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ private static DescribeModelResponse createModelResponse(
resp.addWorker(workerId, startTime, isRunning, gpuId, memory, pid, gpuUsage);
}

DescribeModelResponse.JobQueueStatus jobQueueStatus =
new DescribeModelResponse.JobQueueStatus();
jobQueueStatus.setRemainingCapacity(model.getJobQueueRemainingCapacity());
jobQueueStatus.setPendingRequests(model.getPendingRequestsInJobQueue());
jobQueueStatus.setConsecutiveFailedRequests(model.getFailedInfReqs());
resp.setJobQueueStatus(jobQueueStatus);

return resp;
}

Expand Down
22 changes: 22 additions & 0 deletions frontend/server/src/main/java/org/pytorch/serve/wlm/Model.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ public void pollBatch(String threadId, long waitTime, Map<String, Job> jobsRepo)
}
}

public int getFailedInfReqs() {
return failedInfReqs.get();
}

public int incrFailedInfReqs() {
return failedInfReqs.incrementAndGet();
}
Expand Down Expand Up @@ -402,4 +406,22 @@ public synchronized boolean getJobTickets() {
this.numJobTickets.decrementAndGet();
return true;
}

public int getJobQueueRemainingCapacity() {
LinkedBlockingDeque<Job> jobsQueue = jobsDb.get(DEFAULT_DATA_QUEUE);
if (jobsQueue != null) {
return jobsQueue.remainingCapacity();
}

return 0;
}

public int getPendingRequestsInJobQueue() {
LinkedBlockingDeque<Job> jobsQueue = jobsDb.get(DEFAULT_DATA_QUEUE);
if (jobsQueue != null) {
return jobsQueue.size();
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,25 @@ public void testDescribeSpecificModelVersion() throws InterruptedException {
@Test(
alwaysRun = true,
dependsOnMethods = {"testDescribeSpecificModelVersion"})
public void testDescribeModelJobQueueStatus() throws InterruptedException {
testLoadModelWithInitialWorkers("noop.mar", "noop_describe", "1.11");

Channel channel = TestUtils.getManagementChannel(configManager);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
TestUtils.describeModel(channel, "noop_describe", "1.11", false);
TestUtils.getLatch().await();

DescribeModelResponse[] resp =
JsonUtils.GSON.fromJson(TestUtils.getResult(), DescribeModelResponse[].class);
Assert.assertEquals(resp[0].getJobQueueStatus().getRemainingCapacity(), 100);
Assert.assertEquals(resp[0].getJobQueueStatus().getPendingRequests(), 0);
Assert.assertEquals(resp[0].getJobQueueStatus().getConsecutiveFailedRequests(), 0);
}

@Test(
alwaysRun = true,
dependsOnMethods = {"testDescribeModelJobQueueStatus"})
public void testNoopVersionedPrediction() throws InterruptedException {
testPredictions("noopversioned", "OK", "1.11");
}
Expand Down
44 changes: 22 additions & 22 deletions frontend/server/src/test/resources/management_open_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@
"maxWorkers",
"status",
"workers",
"metrics"
"jobQueueStatus"
],
"properties": {
"modelName": {
Expand Down Expand Up @@ -549,25 +549,25 @@
},
"description": "A list of active backend workers."
},
"metrics": {
"jobQueueStatus": {
"type": "object",
"required": [
"rejectedRequests",
"waitingQueueSize",
"requests"
"remainingCapacity",
"pendingRequests",
"consecutiveFailedRequests"
],
"properties": {
"rejectedRequests": {
"remainingCapacity": {
"type": "integer",
"description": "Number requests has been rejected in last 10 minutes."
"description": "Number of new requests that can be queued."
},
"waitingQueueSize": {
"pendingRequests": {
"type": "integer",
"description": "Number requests waiting in the queue."
"description": "Number of requests waiting in the queue."
},
"requests": {
"consecutiveFailedRequests": {
"type": "integer",
"description": "Number requests processed in last 10 minutes."
"description": "Number of consecutive failed requests."
}
}
}
Expand Down Expand Up @@ -1049,7 +1049,7 @@
"maxWorkers",
"status",
"workers",
"metrics"
"jobQueueStatus"
],
"properties": {
"modelName": {
Expand Down Expand Up @@ -1119,25 +1119,25 @@
},
"description": "A list of active backend workers."
},
"metrics": {
"jobQueueStatus": {
"type": "object",
"required": [
"rejectedRequests",
"waitingQueueSize",
"requests"
"remainingCapacity",
"pendingRequests",
"consecutiveFailedRequests"
],
"properties": {
"rejectedRequests": {
"remainingCapacity": {
"type": "integer",
"description": "Number requests has been rejected in last 10 minutes."
"description": "Number of new requests that can be queued."
},
"waitingQueueSize": {
"pendingRequests": {
"type": "integer",
"description": "Number requests waiting in the queue."
"description": "Number of requests waiting in the queue."
},
"requests": {
"consecutiveFailedRequests": {
"type": "integer",
"description": "Number requests processed in last 10 minutes."
"description": "Number of consecutive failed requests."
}
}
}
Expand Down
Loading