Browse Source

finish up web server example, now with completed thread pool implementation

jmelesky 7 years ago
parent
commit
f25a924c01
2 changed files with 87 additions and 7 deletions
  1. 9 0
      helloweb/src/bin/main.rs
  2. 78 7
      helloweb/src/lib.rs

+ 9 - 0
helloweb/src/bin/main.rs

@@ -22,7 +22,16 @@ fn main() {
     let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
     let pool = ThreadPool::new(4);
 
+    let mut counter = 0;
+
     for stream in listener.incoming() {
+        if counter >= 2 {
+            println!("Shutting down.");
+            break;
+        }
+
+        counter += 1;
+
         // "unwrap" means "just crash if there's an error"
         let stream = stream.unwrap();
 

+ 78 - 7
helloweb/src/lib.rs

@@ -1,10 +1,34 @@
 use std::thread;
-
+use std::sync::mpsc;
+use std::sync::Arc;
+use std::sync::Mutex;
 
 pub struct ThreadPool {
     workers: Vec<Worker>,
+    sender: mpsc::Sender<Message>,
+}
+
+
+trait FnBox {
+    fn call_box(self: Box<Self>);
 }
 
+impl<F: FnOnce()> FnBox for F {
+    fn call_box(self: Box<F>) {
+        (*self)()
+    }
+}
+
+
+
+type Job = Box<FnBox + Send + 'static>;
+
+enum Message {
+    NewJob(Job),
+    Terminate,
+}
+
+
 impl ThreadPool {
     /// Create a new ThreadPool
     ///
@@ -16,14 +40,19 @@ impl ThreadPool {
     pub fn new(size: usize) -> ThreadPool {
         assert!(size > 0);
 
+        let (sender, receiver) = mpsc::channel();
+
+        let receiver = Arc::new(Mutex::new(receiver));
+
         let mut workers = Vec::with_capacity(size);
 
         for id in 0..size {
-            workers.push(Worker::new(id));
+            workers.push(Worker::new(id, receiver.clone()));
         }
 
         ThreadPool {
-            workers
+            workers,
+            sender,
         }
     }
 
@@ -31,22 +60,64 @@ impl ThreadPool {
         where
         F: FnOnce() + Send + 'static
     {
+        let job = Box::new(f);
+
+        self.sender.send(Message::NewJob(job)).unwrap();
+
+    }
+}
+
+
+impl Drop for ThreadPool {
+    fn drop(&mut self) {
+        println!("Terminating worker threads.");
+
+        for _ in &mut self.workers {
+            self.sender.send(Message::Terminate).unwrap();
+        }
+
+        println!("Cleaning up worker threads.");
+
+        for worker in &mut self.workers {
+            println!("Shutting down worker {}", worker.id);
+
+            if let Some(thread) = worker.thread.take() {
+                thread.join().unwrap();
+            }
+        }
     }
 }
 
 
 struct Worker {
     id: usize,
-    thread: thread::JoinHandle<()>,
+    thread: Option<thread::JoinHandle<()>>,
 }
 
 impl Worker {
-    fn new(id: usize) -> Worker {
-        let thread = thread::spawn(|| {});
+    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { // <-- yikes with the <<<>>>
+        let thread = thread::spawn(move || {
+            loop {
+                let message = receiver.lock().unwrap().recv().unwrap(); // <-- wow
+
+                match message {
+                    Message::NewJob(job) => {
+                        println!("Worker {} got a job; executing.", id);
+
+                        job.call_box();
+                    },
+                    Message::Terminate => {
+                        println!("Worker {} received Terminate; closing.", id);
+
+                        break;
+                    },
+                }
+            }
+        });
 
         Worker {
             id,
-            thread,
+            thread: Some(thread),
         }
     }
 }