Skip to content

Commit 5bfe676

Browse files
hengfeiyangoasisk
andauthored
perf: some improve (openobserve#1008)
Co-authored-by: oasisk <[email protected]>
1 parent a08c1bc commit 5bfe676

File tree

30 files changed

+381
-282
lines changed

30 files changed

+381
-282
lines changed

src/handler/http/auth/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ pub async fn validator_gcp(
201201
let query =
202202
web::Query::<std::collections::HashMap<String, String>>::from_query(req.query_string())
203203
.unwrap();
204-
match query.get("API_Key") {
204+
match query.get("API-Key") {
205205
Some(val) => {
206206
let gcp_creds = common::base64::decode(val).unwrap();
207207
let creds = gcp_creds

src/handler/http/request/search/mod.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub async fn search(
137137
let time = start.elapsed().as_secs_f64();
138138
metrics::HTTP_RESPONSE_TIME
139139
.with_label_values(&[
140-
"/_search",
140+
"/api/org/_search",
141141
"200",
142142
&org_id,
143143
"",
@@ -146,7 +146,7 @@ pub async fn search(
146146
.observe(time);
147147
metrics::HTTP_INCOMING_REQUESTS
148148
.with_label_values(&[
149-
"/_search",
149+
"/api/org/_search",
150150
"200",
151151
&org_id,
152152
"",
@@ -160,7 +160,7 @@ pub async fn search(
160160
let time = start.elapsed().as_secs_f64();
161161
metrics::HTTP_RESPONSE_TIME
162162
.with_label_values(&[
163-
"/_search",
163+
"/api/org/_search",
164164
"500",
165165
&org_id,
166166
"",
@@ -169,7 +169,7 @@ pub async fn search(
169169
.observe(time);
170170
metrics::HTTP_INCOMING_REQUESTS
171171
.with_label_values(&[
172-
"/_search",
172+
"/api/org/_search",
173173
"500",
174174
&org_id,
175175
"",
@@ -311,7 +311,7 @@ pub async fn around(
311311
let time = start.elapsed().as_secs_f64();
312312
metrics::HTTP_RESPONSE_TIME
313313
.with_label_values(&[
314-
"/_around",
314+
"/api/org/_around",
315315
"500",
316316
&org_id,
317317
&stream_name,
@@ -320,7 +320,7 @@ pub async fn around(
320320
.observe(time);
321321
metrics::HTTP_INCOMING_REQUESTS
322322
.with_label_values(&[
323-
"/_around",
323+
"/api/org/_around",
324324
"500",
325325
&org_id,
326326
&stream_name,
@@ -363,7 +363,7 @@ pub async fn around(
363363
let time = start.elapsed().as_secs_f64();
364364
metrics::HTTP_RESPONSE_TIME
365365
.with_label_values(&[
366-
"/_around",
366+
"/api/org/_around",
367367
"500",
368368
&org_id,
369369
&stream_name,
@@ -372,7 +372,7 @@ pub async fn around(
372372
.observe(time);
373373
metrics::HTTP_INCOMING_REQUESTS
374374
.with_label_values(&[
375-
"/_around",
375+
"/api/org/_around",
376376
"500",
377377
&org_id,
378378
&stream_name,
@@ -410,7 +410,7 @@ pub async fn around(
410410
let time = start.elapsed().as_secs_f64();
411411
metrics::HTTP_RESPONSE_TIME
412412
.with_label_values(&[
413-
"/_around",
413+
"/api/org/_around",
414414
"200",
415415
&org_id,
416416
&stream_name,
@@ -419,7 +419,7 @@ pub async fn around(
419419
.observe(time);
420420
metrics::HTTP_INCOMING_REQUESTS
421421
.with_label_values(&[
422-
"/_around",
422+
"/api/org/_around",
423423
"200",
424424
&org_id,
425425
&stream_name,
@@ -548,7 +548,7 @@ pub async fn values(
548548
let time = start.elapsed().as_secs_f64();
549549
metrics::HTTP_RESPONSE_TIME
550550
.with_label_values(&[
551-
"/_values",
551+
"/api/org/_values",
552552
"500",
553553
&org_id,
554554
&stream_name,
@@ -557,7 +557,7 @@ pub async fn values(
557557
.observe(time);
558558
metrics::HTTP_INCOMING_REQUESTS
559559
.with_label_values(&[
560-
"/_values",
560+
"/api/org/_values",
561561
"500",
562562
&org_id,
563563
&stream_name,
@@ -593,7 +593,7 @@ pub async fn values(
593593
let time = start.elapsed().as_secs_f64();
594594
metrics::HTTP_RESPONSE_TIME
595595
.with_label_values(&[
596-
"/_values",
596+
"/api/org/_values",
597597
"200",
598598
&org_id,
599599
&stream_name,
@@ -602,7 +602,7 @@ pub async fn values(
602602
.observe(time);
603603
metrics::HTTP_INCOMING_REQUESTS
604604
.with_label_values(&[
605-
"/_values",
605+
"/api/org/_values",
606606
"200",
607607
&org_id,
608608
&stream_name,

src/infra/cluster.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ impl std::fmt::Display for Role {
9898
/// Register and keepalive the node to cluster
9999
pub async fn register_and_keepalive() -> Result<()> {
100100
if CONFIG.common.local_mode {
101-
let nodes = load_local_node_role();
102-
if !is_single_node(nodes) {
101+
let roles = load_local_node_role();
102+
if !is_single_node(&roles) {
103103
panic!("For local mode only single node deployment is allowed !");
104104
}
105105
// cache local node
@@ -439,7 +439,7 @@ pub fn is_alert_manager(role: &[Role]) -> bool {
439439
}
440440

441441
#[inline(always)]
442-
pub fn is_single_node(role: Vec<Role>) -> bool {
442+
pub fn is_single_node(role: &[Role]) -> bool {
443443
role.contains(&Role::All)
444444
}
445445

src/infra/config.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use dashmap::DashMap;
15+
use dashmap::{DashMap, DashSet};
1616
use datafusion::arrow::datatypes::Schema;
1717
use dotenv_config::EnvConfig;
1818
use dotenvy::dotenv;
@@ -32,6 +32,7 @@ use crate::meta::user::User;
3232
use crate::service::enrichment::StreamTable;
3333

3434
pub type RwHashMap<K, V> = DashMap<K, V, ahash::RandomState>;
35+
pub type RwHashSet<K> = DashSet<K, ahash::RandomState>;
3536

3637
pub static VERSION: &str = env!("GIT_VERSION");
3738
pub static COMMIT_HASH: &str = env!("GIT_COMMIT_HASH");
@@ -184,6 +185,8 @@ pub struct Common {
184185
pub column_timestamp: String,
185186
#[env_config(name = "ZO_WIDENING_SCHEMA_EVOLUTION", default = false)]
186187
pub widening_schema_evolution: bool,
188+
#[env_config(name = "ZO_SKIP_SCHEMA_VALIDATION", default = false)]
189+
pub skip_schema_validation: bool,
187190
#[env_config(name = "ZO_FEATURE_PER_THREAD_LOCK", default = false)]
188191
pub feature_per_thread_lock: bool,
189192
#[env_config(name = "ZO_FEATURE_FULLTEXT_ON_ALL_FIELDS", default = false)]
@@ -216,8 +219,6 @@ pub struct Common {
216219
pub prometheus_enabled: bool,
217220
#[env_config(name = "ZO_PRINT_KEY_CONFIG", default = false)]
218221
pub print_key_config: bool,
219-
#[env_config(name = "ZO_SKIP_SCHEMA_VALIDATION", default = false)]
220-
pub skip_schema_validation: bool,
221222
}
222223

223224
#[derive(EnvConfig)]
@@ -265,6 +266,8 @@ pub struct Compact {
265266
pub max_file_size: u64,
266267
#[env_config(name = "ZO_COMPACT_DATA_RETENTION_DAYS", default = 3650)] // in days
267268
pub data_retention_days: i64,
269+
#[env_config(name = "ZO_COMPACT_BLOCKED_ORGS", default = "")] // use comma to split
270+
pub blocked_orgs: String,
268271
}
269272

270273
#[derive(EnvConfig)]
@@ -314,7 +317,7 @@ pub struct Etcd {
314317
pub connect_timeout: u64,
315318
#[env_config(name = "ZO_ETCD_COMMAND_TIMEOUT", default = 5)]
316319
pub command_timeout: u64,
317-
#[env_config(name = "ZO_ETCD_LOCK_WAIT_TIMEOUT", default = 600)]
320+
#[env_config(name = "ZO_ETCD_LOCK_WAIT_TIMEOUT", default = 3600)]
318321
pub lock_wait_timeout: u64,
319322
#[env_config(name = "ZO_ETCD_USER", default = "")]
320323
pub user: String,
@@ -360,6 +363,8 @@ pub struct S3 {
360363
pub bucket_prefix: String,
361364
#[env_config(name = "ZO_S3_CONNECT_TIMEOUT", default = 10)] // seconds
362365
pub connect_timeout: u64,
366+
#[env_config(name = "ZO_S3_REQUEST_TIMEOUT", default = 3600)] // seconds
367+
pub request_timeout: u64,
363368
#[env_config(name = "ZO_S3_FEATURE_FORCE_PATH_STYLE", default = false)]
364369
pub feature_force_path_style: bool,
365370
}

src/infra/storage/remote.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ fn init_aws_config() -> object_store::Result<object_store::aws::AmazonS3> {
183183
.with_client_options(
184184
object_store::ClientOptions::default()
185185
.with_connect_timeout(std::time::Duration::from_secs(CONFIG.s3.connect_timeout))
186+
.with_timeout(std::time::Duration::from_secs(CONFIG.s3.request_timeout))
186187
.with_allow_http(true),
187188
)
188189
.with_profile("default")
@@ -207,7 +208,8 @@ fn init_azure_config() -> object_store::Result<object_store::azure::MicrosoftAzu
207208
let mut builder = object_store::azure::MicrosoftAzureBuilder::from_env()
208209
.with_client_options(
209210
object_store::ClientOptions::default()
210-
.with_connect_timeout(std::time::Duration::from_secs(CONFIG.s3.connect_timeout)),
211+
.with_connect_timeout(std::time::Duration::from_secs(CONFIG.s3.connect_timeout))
212+
.with_timeout(std::time::Duration::from_secs(CONFIG.s3.request_timeout)),
211213
)
212214
.with_container_name(&CONFIG.s3.bucket_name);
213215
if !CONFIG.s3.access_key.is_empty() {
@@ -223,7 +225,8 @@ fn init_gcp_config() -> object_store::Result<object_store::gcp::GoogleCloudStora
223225
let mut builder = object_store::gcp::GoogleCloudStorageBuilder::from_env()
224226
.with_client_options(
225227
object_store::ClientOptions::default()
226-
.with_connect_timeout(std::time::Duration::from_secs(CONFIG.s3.connect_timeout)),
228+
.with_connect_timeout(std::time::Duration::from_secs(CONFIG.s3.connect_timeout))
229+
.with_timeout(std::time::Duration::from_secs(CONFIG.s3.request_timeout)),
227230
)
228231
.with_bucket_name(&CONFIG.s3.bucket_name);
229232
if !CONFIG.s3.access_key.is_empty() {

src/job/files/disk.rs

+26-35
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ use crate::infra::{cluster, config::CONFIG, metrics, storage, wal};
2626
use crate::meta::{common::FileMeta, StreamType};
2727
use crate::service::{db, schema::schema_evolution, search::datafusion::new_writer};
2828

29-
/// TaskResult (local file path, remote file key, file meta, stream type)
30-
type TaskResult = (String, String, FileMeta, StreamType);
31-
3229
pub async fn run() -> Result<(), anyhow::Error> {
3330
if !cluster::is_ingester(&cluster::LOCAL_NODE_ROLE) {
3431
return Ok(()); // not an ingester, no need to init job
@@ -59,8 +56,7 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
5956
let files = scan_files(&pattern);
6057

6158
// use multiple threads to upload files
62-
let mut tasks: Vec<task::JoinHandle<Result<TaskResult, anyhow::Error>>> = Vec::new();
63-
59+
let mut tasks = Vec::new();
6460
let semaphore = std::sync::Arc::new(Semaphore::new(CONFIG.limit.file_move_thread_num));
6561
for file in files {
6662
let local_file = file.to_owned();
@@ -117,33 +113,22 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
117113
}
118114

119115
let permit = semaphore.clone().acquire_owned().await.unwrap();
120-
let task: task::JoinHandle<Result<(String, String, FileMeta, StreamType), anyhow::Error>> =
121-
task::spawn(async move {
122-
let ret = upload_file(
123-
&org_id,
124-
&stream_name,
125-
stream_type,
126-
&local_file,
127-
partition_key,
128-
)
129-
.await;
130-
drop(permit);
131-
match ret {
132-
Ok((key, meta, stream_type)) => Ok((local_file, key, meta, stream_type)),
133-
Err(e) => Err(e),
134-
}
135-
});
136-
tasks.push(task);
137-
task::yield_now().await;
138-
}
139-
140-
for task in tasks {
141-
match task.await {
142-
Ok(ret) => match ret {
143-
Ok((path, key, meta, _stream_type)) => {
116+
let task: task::JoinHandle<Result<(), anyhow::Error>> = task::spawn(async move {
117+
let ret = upload_file(
118+
&org_id,
119+
&stream_name,
120+
stream_type,
121+
&local_file,
122+
partition_key,
123+
)
124+
.await;
125+
drop(permit);
126+
match ret {
127+
Err(e) => log::error!("[JOB] Error while uploading disk file to storage {}", e),
128+
Ok((key, meta, _stream_type)) => {
144129
match db::file_list::local::set(&key, meta, false).await {
145130
Ok(_) => {
146-
match fs::remove_file(&path) {
131+
match fs::remove_file(&local_file) {
147132
Ok(_) => {
148133
// metrics
149134
let columns = key.split('/').collect::<Vec<&str>>();
@@ -158,22 +143,28 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
158143
Err(e) => {
159144
log::error!(
160145
"[JOB] Failed to remove disk file from disk: {}, {}",
161-
path,
146+
local_file,
162147
e.to_string()
163148
)
164149
}
165150
}
166151
}
167152
Err(e) => log::error!(
168153
"[JOB] Failed write disk file meta:{}, error: {}",
169-
path,
154+
local_file,
170155
e.to_string()
171156
),
172157
}
173158
}
174-
Err(e) => log::error!("[JOB] Error while uploading disk file to storage {}", e),
175-
},
176-
Err(e) => log::error!("[JOB] Error while uploading disk file to storage {}", e),
159+
};
160+
Ok(())
161+
});
162+
tasks.push(task);
163+
}
164+
165+
for task in tasks {
166+
if let Err(e) = task.await {
167+
log::error!("[JOB] Error while uploading disk file to storage {}", e);
177168
};
178169
}
179170
Ok(())

0 commit comments

Comments
 (0)