Skip to content

Commit 2d9d6a5

Browse files
authored
Hyper everywhere (toshi-search#334)
* Hyper everywhere * Hyper everywhere * Some client clean up * just gotta make the tests work * just gotta make the tests work * Most of the test work now, I just gotta do delete terms now * Some style and convenience tweaks * Make a test not fail * Remote delete of terms from an index
1 parent b870690 commit 2d9d6a5

File tree

24 files changed

+752
-1386
lines changed

24 files changed

+752
-1386
lines changed

Cargo.lock

Lines changed: 231 additions & 897 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,19 @@ members = [ "./", "toshi-proto" ]
1919
[dependencies]
2020
toshi-proto = { path = "toshi-proto" }
2121
tower = "^0.1"
22-
tower-service = "^0.2"
2322
tower-buffer = "^0.1"
24-
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
25-
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
26-
tower-http = { git = "https://github.com/tower-rs/tower-http" }
23+
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", features = ["tower-hyper"] }
24+
tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" }
2725
tower-request-modifier = { git = "https://github.com/tower-rs/tower-http" }
2826
tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
2927
futures-watch = { git = "https://github.com/carllerche/better-future" }
30-
tower-web = "^0.3"
3128
http = "^0.1"
32-
h2 = "^0.1"
33-
flate2 = "^1.0"
3429
bytes = "^0.4"
35-
prost = "^0.5"
36-
prost-derive = "^0.5"
3730
hyper = "^0.12"
38-
hyper-tls = "^0.3"
3931
serde_json = "^1.0"
4032
futures = "^0.1"
4133
tantivy = "^0.9"
4234
tokio = "^0.1"
43-
tokio-executor = "^0.1"
4435
tokio-signal = "^0.2"
4536
config = "^0.9"
4637
log = "^0.4"

config/config.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
host = "127.0.0.1"
22
port = 8080
3-
path = "data2/"
3+
path = "data/"
44
writer_memory = 200000000
55
log_level = "info"
66
json_parsing_threads = 4
77
bulk_buffer_size = 10000
88
auto_commit_duration = 10
9-
experimental = true
9+
experimental = false
1010

1111
[experimental_features]
1212
master = true
1313
nodes = [
14-
"[::1]:8081"
14+
"127.0.0.1:8081"
1515
]
1616

1717
[merge_policy]

src/bin/toshi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ fn run_master(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl F
128128
let update = catalog.read().unwrap().update_remote_indexes();
129129
tokio::spawn(update);
130130
}
131-
router_with_catalog(&bind, &catalog)
131+
router_with_catalog(&bind, Arc::clone(&catalog))
132132
});
133133
future::Either::A(run)
134134
} else {
135-
let run = commit_watcher.and_then(move |_| router_with_catalog(&bind, &catalog));
135+
let run = commit_watcher.and_then(move |_| router_with_catalog(&bind, Arc::clone(&catalog)));
136136
future::Either::B(run)
137137
}
138138
}

src/cluster/consul.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use hyper::body::Body;
77
use hyper::client::HttpConnector;
88
use hyper::http::uri::Scheme;
99
use hyper::{Client, Request, Response, Uri};
10-
use hyper_tls::HttpsConnector;
1110
use serde::{Deserialize, Serialize};
11+
use tower::Service;
1212
use tower_consul::{Consul as TowerConsul, ConsulService, KVValue};
1313

1414
use crate::cluster::shard::PrimaryShard;
@@ -203,19 +203,19 @@ impl Builder {
203203

204204
#[derive(Clone)]
205205
pub struct HttpsService {
206-
client: Client<HttpsConnector<HttpConnector>>,
206+
client: Client<HttpConnector>,
207207
}
208208

209209
impl HttpsService {
210210
fn new() -> Self {
211-
let https = HttpsConnector::new(4).expect("Could not create TLS for Hyper");
211+
let https = HttpConnector::new(4);
212212
let client = Client::builder().build::<_, hyper::Body>(https);
213213

214214
HttpsService { client }
215215
}
216216
}
217217

218-
impl tower_service::Service<Request<Bytes>> for HttpsService {
218+
impl Service<Request<Bytes>> for HttpsService {
219219
type Response = Response<Bytes>;
220220
type Error = hyper::Error;
221221
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

src/cluster/mod.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ use std::net::SocketAddr;
33
use std::time::Duration;
44

55
use failure::Fail;
6-
use futures::{future, Future, Poll};
6+
use futures::{future, Future};
77
use log::error;
88
use serde::{Deserialize, Serialize};
9-
use tokio::net::tcp::ConnectFuture;
10-
use tokio::net::TcpStream;
11-
use tower_h2::client::ConnectError;
9+
use tower_hyper::client::ConnectError;
1210

1311
use crate::cluster::consul::Hosts;
1412
use crate::settings::Settings;
@@ -150,20 +148,3 @@ impl From<tower_grpc::Status> for RPCError {
150148
RPCError::RPCError(err)
151149
}
152150
}
153-
154-
#[derive(Debug, Clone)]
155-
pub struct GrpcConn(pub SocketAddr);
156-
157-
impl tower_service::Service<()> for GrpcConn {
158-
type Response = TcpStream;
159-
type Error = io::Error;
160-
type Future = ConnectFuture;
161-
162-
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
163-
Ok(().into())
164-
}
165-
166-
fn call(&mut self, _: ()) -> Self::Future {
167-
TcpStream::connect(&self.0)
168-
}
169-
}

src/cluster/placement/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ use futures_watch::Watch;
66
use log::error;
77
use tokio::net::TcpListener;
88
use tower_grpc::{Request, Response, Status};
9-
use tower_h2::Server;
109

1110
use crate::cluster::consul::Consul;
1211
use toshi_proto::placement_proto::{server, PlacementReply, PlacementRequest};
1312

1413
pub use self::background::Background;
15-
use tokio_executor::DefaultExecutor;
1614

1715
pub mod background;
1816

@@ -35,14 +33,14 @@ impl Place {
3533
let placer = Place { consul, nodes };
3634
let placement = server::PlacementServer::new(placer);
3735

38-
// let mut hyp = tower_hyper::server::Server::new(placement);
39-
let mut h2 = Server::new(placement, Default::default(), DefaultExecutor::current());
36+
let mut hyp = tower_hyper::server::Server::new(placement);
37+
4038
bind.incoming().for_each(move |stream| {
4139
if let Err(e) = stream.set_nodelay(true) {
4240
return Err(e);
4341
}
4442

45-
let serve = h2.serve(stream);
43+
let serve = hyp.serve(stream);
4644
tokio::spawn(serve.map_err(|e| error!("Placement Server Error: {:?}", e)));
4745

4846
Ok(())

src/cluster/remote_handle.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use rand::prelude::*;
55
use tokio::prelude::*;
66
use tower_grpc::{Request as TowerRequest, Response};
77

8-
use toshi_proto::cluster_rpc::{DocumentRequest, ResultReply, SearchReply, SearchRequest};
8+
use toshi_proto::cluster_rpc::{DeleteRequest, DocumentRequest, SearchReply, SearchRequest};
99

1010
use crate::cluster::rpc_server::RpcClient;
1111
use crate::cluster::RPCError;
@@ -49,7 +49,7 @@ impl RemoteIndex {
4949

5050
impl IndexHandle for RemoteIndex {
5151
type SearchResponse = Box<Future<Item = Vec<SearchReply>, Error = RPCError> + Send>;
52-
type DeleteResponse = Box<Future<Item = ResultReply, Error = RPCError> + Send>;
52+
type DeleteResponse = Box<Future<Item = Vec<i32>, Error = RPCError> + Send>;
5353
type AddResponse = Box<Future<Item = Vec<i32>, Error = RPCError> + Send>;
5454

5555
fn get_name(&self) -> String {
@@ -93,7 +93,7 @@ impl IndexHandle for RemoteIndex {
9393
Err(_) => Vec::new(),
9494
};
9595
let req = TowerRequest::new(DocumentRequest {
96-
index: name.clone(),
96+
index: name,
9797
document: bytes,
9898
});
9999
client
@@ -111,7 +111,30 @@ impl IndexHandle for RemoteIndex {
111111
Box::new(future::join_all(fut))
112112
}
113113

114-
fn delete_term(&mut self, _: DeleteDoc) -> Self::DeleteResponse {
115-
unimplemented!()
114+
fn delete_term(&self, delete: DeleteDoc) -> Self::DeleteResponse {
115+
let name = self.name.clone();
116+
let clients = self.remotes.clone();
117+
let fut = clients.into_iter().map(move |mut client| {
118+
let bytes = match serde_json::to_vec(&delete) {
119+
Ok(v) => v,
120+
Err(_) => Vec::new(),
121+
};
122+
let req = TowerRequest::new(DeleteRequest {
123+
index: name.clone(),
124+
terms: bytes,
125+
});
126+
client
127+
.delete_document(req)
128+
.map(|res| {
129+
info!("RESPONSE = {:?}", res);
130+
res.into_inner().code
131+
})
132+
.map_err(|e| {
133+
info!("ERR = {:?}", e);
134+
e.into()
135+
})
136+
});
137+
138+
Box::new(future::join_all(fut))
116139
}
117140
}

0 commit comments

Comments
 (0)