
【応用編】for select で作る堅牢なワーカープール
前回の記事では for select の基本を学びました。今回はこれを応用して、実務で頻出する 「ワーカープール」 パターンを実装してみましょう。
前回の記事 taglibrary.hatenablog.com
なぜワーカープールが必要なのか?
単純に go func() をループで回すと、タスクが1万個あれば1万個のゴルーチンが立ち上がってしまいます。これではメモリを食いつぶし、CPUのスイッチングコストも増大して逆に遅くなることがあります。
ワーカープールを使うと、「5人の作業員(ゴルーチン)で、100個の荷物(タスク)を順番に処理する」 といった制御が可能になります。
実装コード:キャンセル可能なワーカー
ここでは、context を使って 「いつでも停止可能」 かつ 「並行数を制限した」 ワーカープールを作ります。
package main import ( "context" "fmt" "sync" "time" ) // ワーカー:ジョブを受け取って処理する関数 func worker(id int, jobs <-chan int, results chan<- int, ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() // 関数終了時にWaitGroupを減らす fmt.Printf("ワーカー %d: 起動\n", id) for { select { // 1. キャンセル信号(停止命令)の監視 case <-ctx.Done(): fmt.Printf("ワーカー %d: 停止信号を受信、終了します\n", id) return // 2. ジョブの受信と処理 case job, ok := <-jobs: if !ok { // ジョブチャネルが閉じられたら終了 fmt.Printf("ワーカー %d: 全ジョブ完了\n", id) return } // 実際の処理(ここでは重い処理をシミュレート) fmt.Printf("ワーカー %d: ジョブ %d を開始\n", id, job) time.Sleep(1 * time.Second) // 結果を送信 results <- job * 2 } } } func main() { const numWorkers = 3 // ワーカー(作業員)の数 const numJobs = 10 // ジョブ(タスク)の数 // チャネルとContextの準備 jobs := make(chan int, numJobs) results := make(chan int, numJobs) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup // 1. ワーカーを起動(3つだけ立ち上げる) for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(w, jobs, results, ctx, &wg) } // 2. ジョブを投入 // (別ゴルーチンにしないと、バッファが溢れた場合にブロックするため) go func() { for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 全ジョブ投入後にチャネルを閉じる }() // 3. 結果の収集(デモのため、少し待ってからキャンセルしてみる) go func() { // 全ワーカーが終了するのを待つゴルーチン wg.Wait() close(results) }() // 結果を表示 // ※注意: 実験として、途中で処理を強制中断させてみます stopTimer := time.After(3 * time.Second) Loop: for { select { case res, ok := <-results: if !ok { break Loop // 結果チャネルが閉じたら終了 } fmt.Println(" -> 結果受信:", res) case <-stopTimer: fmt.Println("\n!!! タイムアウト:処理を強制中断します !!!") cancel() // ここでContextをキャンセル! // ここではbreakせず、ワーカーの終了ログを見るために少し待ちます } } // 終了処理を待つための猶予(実際のアプリではwg.Wait()などで制御) time.Sleep(1 * time.Second) fmt.Println("メインプロセス終了") }
コードのポイント解説
1. select による優先順位の制御
ワーカー内の for select がこのコードの肝です。
select { case <-ctx.Done(): return case job, ok := <-jobs: // ...処理... }
この構造により、 「ジョブを処理している最中でも、ctx.Done()(停止信号)が来たら、次のループで即座に停止できる」 ようになります。単に range jobs でループするだけでは、コンテキストによるキャンセル制御をこれほどきれいに書くことはできません。
2. 並行数の制限
main 関数内で go worker(...) を呼ぶ回数を const numWorkers = 3 で制限しています。ジョブが100個あっても1万個あっても、同時に動くのは常に3つだけ。これによりサーバーのリソースを守ります。
3. Graceful Shutdown(安全な停止)
cancel() を呼び出すことで、すべてのワーカーが一斉に case <-ctx.Done(): に入り、安全に終了処理(クリーンアップ)を行ってから return することができます。
まとめ
for select パターンをワーカープールに適用することで、以下のメリットが得られます。
- リソース管理: ゴルーチンの数を一定に保てる。
- キャンセル制御: 処理全体をいつでも安全に停止できる。
- 拡張性: ジョブの投入側と処理側を完全に分離(Decouple)できる。
Go言語の並行処理の強みは、こうしたパターンを標準ライブラリだけでシンプルに書ける点にあります。ぜひ実際のツール開発などで活用してみてください。
次のアクション
コードに「エラーハンドリング」を追加する方法 taglibrary.hatenablog.com




