--- contrib/xz/src/liblzma/common/stream_decoder_mt.c.orig +++ contrib/xz/src/liblzma/common/stream_decoder_mt.c @@ -23,15 +23,10 @@ THR_IDLE, /// Decoding is in progress. - /// Main thread may change this to THR_STOP or THR_EXIT. + /// Main thread may change this to THR_IDLE or THR_EXIT. /// The worker thread may change this to THR_IDLE. THR_RUN, - /// The main thread wants the thread to stop whatever it was doing - /// but not exit. Main thread may change this to THR_EXIT. - /// The worker thread may change this to THR_IDLE. - THR_STOP, - /// The main thread wants the thread to exit. THR_EXIT, @@ -346,27 +341,6 @@ } -/// Things do to at THR_STOP or when finishing a Block. -/// This is called with thr->mutex locked. -static void -worker_stop(struct worker_thread *thr) -{ - // Update memory usage counters. - thr->coder->mem_in_use -= thr->in_size; - thr->in_size = 0; // thr->in was freed above. - - thr->coder->mem_in_use -= thr->mem_filters; - thr->coder->mem_cached += thr->mem_filters; - - // Put this thread to the stack of free threads. - thr->next = thr->coder->threads_free; - thr->coder->threads_free = thr; - - mythread_cond_signal(&thr->coder->cond); - return; -} - - static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) { @@ -397,17 +371,6 @@ return MYTHREAD_RET_VALUE; } - if (thr->state == THR_STOP) { - thr->state = THR_IDLE; - mythread_mutex_unlock(&thr->mutex); - - mythread_sync(thr->coder->mutex) { - worker_stop(thr); - } - - goto next_loop_lock; - } - assert(thr->state == THR_RUN); // Update progress info for get_progress(). @@ -472,8 +435,7 @@ } // Either we finished successfully (LZMA_STREAM_END) or an error - // occurred. Both cases are handled almost identically. The error - // case requires updating thr->coder->thread_error. + // occurred. // // The sizes are in the Block Header and the Block decoder // checks that they match, thus we know these: @@ -481,16 +443,30 @@ assert(ret != LZMA_STREAM_END || thr->out_pos == thr->block_options.uncompressed_size); - // Free the input buffer. Don't update in_size as we need - // it later to update thr->coder->mem_in_use. - lzma_free(thr->in, thr->allocator); - thr->in = NULL; - mythread_sync(thr->mutex) { + // Block decoder ensures this, but do a sanity check anyway + // because thr->in_filled < thr->in_size means that the main + // thread is still writing to thr->in. + if (ret == LZMA_STREAM_END && thr->in_filled != thr->in_size) { + assert(0); + ret = LZMA_PROG_ERROR; + } + if (thr->state != THR_EXIT) thr->state = THR_IDLE; } + // Free the input buffer. Don't update in_size as we need + // it later to update thr->coder->mem_in_use. + // + // This step is skipped if an error occurred because the main thread + // might still be writing to thr->in. The memory will be freed after + // threads_end() sets thr->state = THR_EXIT. + if (ret == LZMA_STREAM_END) { + lzma_free(thr->in, thr->allocator); + thr->in = NULL; + } + mythread_sync(thr->coder->mutex) { // Move our progress info to the main thread. thr->coder->progress_in += thr->in_pos; @@ -510,7 +486,20 @@ && thr->coder->thread_error == LZMA_OK) thr->coder->thread_error = ret; - worker_stop(thr); + // Return the worker thread to the stack of available + // threads only if no errors occurred. + if (ret == LZMA_STREAM_END) { + // Update memory usage counters. + thr->coder->mem_in_use -= thr->in_size; + thr->coder->mem_in_use -= thr->mem_filters; + thr->coder->mem_cached += thr->mem_filters; + + // Put this thread to the stack of free threads. + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + } + + mythread_cond_signal(&thr->coder->cond); } goto next_loop_lock; @@ -544,17 +533,22 @@ } +/// Tell worker threads to stop without doing any cleaning up. +/// The clean up will be done when threads_exit() is called; +/// it's not possible to reuse the threads after threads_stop(). +/// +/// This is called before returning an unrecoverable error code +/// to the application. It would be waste of processor time +/// to keep the threads running in such a situation. static void threads_stop(struct lzma_stream_coder *coder) { for (uint32_t i = 0; i < coder->threads_initialized; ++i) { + // The threads that are in the THR_RUN state will stop + // when they check the state the next time. There's no + // need to signal coder->threads[i].cond. mythread_sync(coder->threads[i].mutex) { - // The state must be changed conditionally because - // THR_IDLE -> THR_STOP is not a valid state change. - if (coder->threads[i].state != THR_IDLE) { - coder->threads[i].state = THR_STOP; - mythread_cond_signal(&coder->threads[i].cond); - } + coder->threads[i].state = THR_IDLE; } } @@ -1561,6 +1555,10 @@ } // Return if the input didn't contain the whole Block. + // + // NOTE: When we updated coder->thr->in_filled a few lines + // above, the worker thread might by now have finished its + // work and returned itself back to the stack of free threads. if (coder->thr->in_filled < coder->thr->in_size) { assert(*in_pos == in_size); return LZMA_OK; @@ -1948,7 +1946,7 @@ // accounting from scratch, too. Changes in filter and block sizes may // affect number of threads. // - // FIXME? Reusing should be easy but unlike the single-threaded + // Reusing threads doesn't seem worth it. Unlike the single-threaded // decoder, with some types of input file combinations reusing // could leave quite a lot of memory allocated but unused (first // file could allocate a lot, the next files could use fewer