libavcodec/pthread.c
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2004 Roman Shaposhnik
00003  * Copyright (c) 2008 Alexander Strange (astrange@ithinksw.com)
00004  *
00005  * Many thanks to Steven M. Schultz for providing clever ideas and
00006  * to Michael Niedermayer <michaelni@gmx.at> for writing initial
00007  * implementation.
00008  *
00009  * This file is part of FFmpeg.
00010  *
00011  * FFmpeg is free software; you can redistribute it and/or
00012  * modify it under the terms of the GNU Lesser General Public
00013  * License as published by the Free Software Foundation; either
00014  * version 2.1 of the License, or (at your option) any later version.
00015  *
00016  * FFmpeg is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00019  * Lesser General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU Lesser General Public
00022  * License along with FFmpeg; if not, write to the Free Software
00023  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00024  */
00025 
00032 #include "config.h"
00033 
00034 #if HAVE_SCHED_GETAFFINITY
00035 #define _GNU_SOURCE
00036 #include <sched.h>
00037 #endif
00038 #if HAVE_GETPROCESSAFFINITYMASK
00039 #include <windows.h>
00040 #endif
00041 #if HAVE_SYSCTL
00042 #if HAVE_SYS_PARAM_H
00043 #include <sys/param.h>
00044 #endif
00045 #include <sys/types.h>
00046 #include <sys/param.h>
00047 #include <sys/sysctl.h>
00048 #endif
00049 #if HAVE_SYSCONF
00050 #include <unistd.h>
00051 #endif
00052 
00053 #include "avcodec.h"
00054 #include "internal.h"
00055 #include "thread.h"
00056 
00057 #if HAVE_PTHREADS
00058 #include <pthread.h>
00059 #elif HAVE_W32THREADS
00060 #include "w32pthreads.h"
00061 #elif HAVE_OS2THREADS
00062 #include "os2threads.h"
00063 #endif
00064 
00065 typedef int (action_func)(AVCodecContext *c, void *arg);
00066 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
00067 
00068 typedef struct ThreadContext {
00069     pthread_t *workers;
00070     action_func *func;
00071     action_func2 *func2;
00072     void *args;
00073     int *rets;
00074     int rets_count;
00075     int job_count;
00076     int job_size;
00077 
00078     pthread_cond_t last_job_cond;
00079     pthread_cond_t current_job_cond;
00080     pthread_mutex_t current_job_lock;
00081     int current_job;
00082     int done;
00083 } ThreadContext;
00084 
00086 #define MAX_BUFFERS (32+1)
00087 
00091 typedef struct PerThreadContext {
00092     struct FrameThreadContext *parent;
00093 
00094     pthread_t      thread;
00095     int            thread_init;
00096     pthread_cond_t input_cond;      
00097     pthread_cond_t progress_cond;   
00098     pthread_cond_t output_cond;     
00099 
00100     pthread_mutex_t mutex;          
00101     pthread_mutex_t progress_mutex; 
00102 
00103     AVCodecContext *avctx;          
00104 
00105     AVPacket       avpkt;           
00106     int            allocated_buf_size; 
00107 
00108     AVFrame frame;                  
00109     int     got_frame;              
00110     int     result;                 
00111 
00112     enum {
00113         STATE_INPUT_READY,          
00114         STATE_SETTING_UP,           
00115         STATE_GET_BUFFER,           
00119         STATE_SETUP_FINISHED        
00120     } state;
00121 
00126     AVFrame released_buffers[MAX_BUFFERS];
00127     int     num_released_buffers;
00128 
00132     int     progress[MAX_BUFFERS][2];
00133     uint8_t progress_used[MAX_BUFFERS];
00134 
00135     AVFrame *requested_frame;       
00136 } PerThreadContext;
00137 
00141 typedef struct FrameThreadContext {
00142     PerThreadContext *threads;     
00143     PerThreadContext *prev_thread; 
00144 
00145     pthread_mutex_t buffer_mutex;  
00146 
00147     int next_decoding;             
00148     int next_finished;             
00149 
00150     int delaying;                  
00155     int die;                       
00156 } FrameThreadContext;
00157 
00158 
00159 /* H264 slice threading seems to be buggy with more than 16 threads,
00160  * limit the number of threads to 16 for automatic detection */
00161 #define MAX_AUTO_THREADS 16
00162 
00163 static int get_logical_cpus(AVCodecContext *avctx)
00164 {
00165     int ret, nb_cpus = 1;
00166 #if HAVE_SCHED_GETAFFINITY && defined(CPU_COUNT)
00167     cpu_set_t cpuset;
00168 
00169     CPU_ZERO(&cpuset);
00170 
00171     ret = sched_getaffinity(0, sizeof(cpuset), &cpuset);
00172     if (!ret) {
00173         nb_cpus = CPU_COUNT(&cpuset);
00174     }
00175 #elif HAVE_GETPROCESSAFFINITYMASK
00176     DWORD_PTR proc_aff, sys_aff;
00177     ret = GetProcessAffinityMask(GetCurrentProcess(), &proc_aff, &sys_aff);
00178     if (ret)
00179         nb_cpus = av_popcount64(proc_aff);
00180 #elif HAVE_SYSCTL && defined(HW_NCPU)
00181     int mib[2] = { CTL_HW, HW_NCPU };
00182     size_t len = sizeof(nb_cpus);
00183 
00184     ret = sysctl(mib, 2, &nb_cpus, &len, NULL, 0);
00185     if (ret == -1)
00186         nb_cpus = 0;
00187 #elif HAVE_SYSCONF && defined(_SC_NPROC_ONLN)
00188     nb_cpus = sysconf(_SC_NPROC_ONLN);
00189 #elif HAVE_SYSCONF && defined(_SC_NPROCESSORS_ONLN)
00190     nb_cpus = sysconf(_SC_NPROCESSORS_ONLN);
00191 #endif
00192     av_log(avctx, AV_LOG_DEBUG, "detected %d logical cores\n", nb_cpus);
00193 
00194     if  (avctx->height)
00195         nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16);
00196 
00197     return nb_cpus;
00198 }
00199 
00200 
00201 static void* attribute_align_arg worker(void *v)
00202 {
00203     AVCodecContext *avctx = v;
00204     ThreadContext *c = avctx->thread_opaque;
00205     int our_job = c->job_count;
00206     int thread_count = avctx->thread_count;
00207     int self_id;
00208 
00209     pthread_mutex_lock(&c->current_job_lock);
00210     self_id = c->current_job++;
00211     for (;;){
00212         while (our_job >= c->job_count) {
00213             if (c->current_job == thread_count + c->job_count)
00214                 pthread_cond_signal(&c->last_job_cond);
00215 
00216             pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
00217             our_job = self_id;
00218 
00219             if (c->done) {
00220                 pthread_mutex_unlock(&c->current_job_lock);
00221                 return NULL;
00222             }
00223         }
00224         pthread_mutex_unlock(&c->current_job_lock);
00225 
00226         c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
00227                                                    c->func2(avctx, c->args, our_job, self_id);
00228 
00229         pthread_mutex_lock(&c->current_job_lock);
00230         our_job = c->current_job++;
00231     }
00232 }
00233 
00234 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
00235 {
00236     pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
00237     pthread_mutex_unlock(&c->current_job_lock);
00238 }
00239 
00240 static void thread_free(AVCodecContext *avctx)
00241 {
00242     ThreadContext *c = avctx->thread_opaque;
00243     int i;
00244 
00245     pthread_mutex_lock(&c->current_job_lock);
00246     c->done = 1;
00247     pthread_cond_broadcast(&c->current_job_cond);
00248     pthread_mutex_unlock(&c->current_job_lock);
00249 
00250     for (i=0; i<avctx->thread_count; i++)
00251          pthread_join(c->workers[i], NULL);
00252 
00253     pthread_mutex_destroy(&c->current_job_lock);
00254     pthread_cond_destroy(&c->current_job_cond);
00255     pthread_cond_destroy(&c->last_job_cond);
00256     av_free(c->workers);
00257     av_freep(&avctx->thread_opaque);
00258 }
00259 
00260 static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
00261 {
00262     ThreadContext *c= avctx->thread_opaque;
00263     int dummy_ret;
00264 
00265     if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
00266         return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
00267 
00268     if (job_count <= 0)
00269         return 0;
00270 
00271     pthread_mutex_lock(&c->current_job_lock);
00272 
00273     c->current_job = avctx->thread_count;
00274     c->job_count = job_count;
00275     c->job_size = job_size;
00276     c->args = arg;
00277     c->func = func;
00278     if (ret) {
00279         c->rets = ret;
00280         c->rets_count = job_count;
00281     } else {
00282         c->rets = &dummy_ret;
00283         c->rets_count = 1;
00284     }
00285     pthread_cond_broadcast(&c->current_job_cond);
00286 
00287     avcodec_thread_park_workers(c, avctx->thread_count);
00288 
00289     return 0;
00290 }
00291 
00292 static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
00293 {
00294     ThreadContext *c= avctx->thread_opaque;
00295     c->func2 = func2;
00296     return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0);
00297 }
00298 
00299 static int thread_init(AVCodecContext *avctx)
00300 {
00301     int i;
00302     ThreadContext *c;
00303     int thread_count = avctx->thread_count;
00304 
00305     if (!thread_count) {
00306         int nb_cpus = get_logical_cpus(avctx);
00307         // use number of cores + 1 as thread count if there is more than one
00308         if (nb_cpus > 1)
00309             thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
00310         else
00311             thread_count = avctx->thread_count = 1;
00312     }
00313 
00314     if (thread_count <= 1) {
00315         avctx->active_thread_type = 0;
00316         return 0;
00317     }
00318 
00319     c = av_mallocz(sizeof(ThreadContext));
00320     if (!c)
00321         return -1;
00322 
00323     c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
00324     if (!c->workers) {
00325         av_free(c);
00326         return -1;
00327     }
00328 
00329     avctx->thread_opaque = c;
00330     c->current_job = 0;
00331     c->job_count = 0;
00332     c->job_size = 0;
00333     c->done = 0;
00334     pthread_cond_init(&c->current_job_cond, NULL);
00335     pthread_cond_init(&c->last_job_cond, NULL);
00336     pthread_mutex_init(&c->current_job_lock, NULL);
00337     pthread_mutex_lock(&c->current_job_lock);
00338     for (i=0; i<thread_count; i++) {
00339         if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
00340            avctx->thread_count = i;
00341            pthread_mutex_unlock(&c->current_job_lock);
00342            ff_thread_free(avctx);
00343            return -1;
00344         }
00345     }
00346 
00347     avcodec_thread_park_workers(c, thread_count);
00348 
00349     avctx->execute = avcodec_thread_execute;
00350     avctx->execute2 = avcodec_thread_execute2;
00351     return 0;
00352 }
00353 
00361 static attribute_align_arg void *frame_worker_thread(void *arg)
00362 {
00363     PerThreadContext *p = arg;
00364     FrameThreadContext *fctx = p->parent;
00365     AVCodecContext *avctx = p->avctx;
00366     AVCodec *codec = avctx->codec;
00367 
00368     while (1) {
00369         int i;
00370         if (p->state == STATE_INPUT_READY && !fctx->die) {
00371             pthread_mutex_lock(&p->mutex);
00372             while (p->state == STATE_INPUT_READY && !fctx->die)
00373                 pthread_cond_wait(&p->input_cond, &p->mutex);
00374             pthread_mutex_unlock(&p->mutex);
00375         }
00376 
00377         if (fctx->die) break;
00378 
00379         if (!codec->update_thread_context && (avctx->thread_safe_callbacks || avctx->get_buffer == avcodec_default_get_buffer))
00380             ff_thread_finish_setup(avctx);
00381 
00382         pthread_mutex_lock(&p->mutex);
00383         avcodec_get_frame_defaults(&p->frame);
00384         p->got_frame = 0;
00385         p->result = codec->decode(avctx, &p->frame, &p->got_frame, &p->avpkt);
00386 
00387         if (p->state == STATE_SETTING_UP) ff_thread_finish_setup(avctx);
00388 
00389         p->state = STATE_INPUT_READY;
00390 
00391         pthread_mutex_lock(&p->progress_mutex);
00392         for (i = 0; i < MAX_BUFFERS; i++)
00393             if (p->progress_used[i]) {
00394                 p->progress[i][0] = INT_MAX;
00395                 p->progress[i][1] = INT_MAX;
00396             }
00397         pthread_cond_broadcast(&p->progress_cond);
00398         pthread_cond_signal(&p->output_cond);
00399         pthread_mutex_unlock(&p->progress_mutex);
00400 
00401         pthread_mutex_unlock(&p->mutex);
00402     }
00403 
00404     return NULL;
00405 }
00406 
00414 static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, int for_user)
00415 {
00416     int err = 0;
00417 
00418     if (dst != src) {
00419         dst->sub_id    = src->sub_id;
00420         dst->time_base = src->time_base;
00421         dst->width     = src->width;
00422         dst->height    = src->height;
00423         dst->pix_fmt   = src->pix_fmt;
00424 
00425         dst->coded_width  = src->coded_width;
00426         dst->coded_height = src->coded_height;
00427 
00428         dst->has_b_frames = src->has_b_frames;
00429         dst->idct_algo    = src->idct_algo;
00430 
00431         dst->bits_per_coded_sample = src->bits_per_coded_sample;
00432         dst->sample_aspect_ratio   = src->sample_aspect_ratio;
00433         dst->dtg_active_format     = src->dtg_active_format;
00434 
00435         dst->profile = src->profile;
00436         dst->level   = src->level;
00437 
00438         dst->bits_per_raw_sample = src->bits_per_raw_sample;
00439         dst->ticks_per_frame     = src->ticks_per_frame;
00440         dst->color_primaries     = src->color_primaries;
00441 
00442         dst->color_trc   = src->color_trc;
00443         dst->colorspace  = src->colorspace;
00444         dst->color_range = src->color_range;
00445         dst->chroma_sample_location = src->chroma_sample_location;
00446     }
00447 
00448     if (for_user) {
00449         dst->delay       = src->thread_count - 1;
00450         dst->coded_frame = src->coded_frame;
00451     } else {
00452         if (dst->codec->update_thread_context)
00453             err = dst->codec->update_thread_context(dst, src);
00454     }
00455 
00456     return err;
00457 }
00458 
00466 static int update_context_from_user(AVCodecContext *dst, AVCodecContext *src)
00467 {
00468 #define copy_fields(s, e) memcpy(&dst->s, &src->s, (char*)&dst->e - (char*)&dst->s);
00469     dst->flags          = src->flags;
00470 
00471     dst->draw_horiz_band= src->draw_horiz_band;
00472     dst->get_buffer     = src->get_buffer;
00473     dst->release_buffer = src->release_buffer;
00474 
00475     dst->opaque   = src->opaque;
00476     dst->dsp_mask = src->dsp_mask;
00477     dst->debug    = src->debug;
00478     dst->debug_mv = src->debug_mv;
00479 
00480     dst->slice_flags = src->slice_flags;
00481     dst->flags2      = src->flags2;
00482 
00483     copy_fields(skip_loop_filter, bidir_refine);
00484 
00485     dst->frame_number     = src->frame_number;
00486     dst->reordered_opaque = src->reordered_opaque;
00487     dst->thread_safe_callbacks = src->thread_safe_callbacks;
00488 
00489     if (src->slice_count && src->slice_offset) {
00490         if (dst->slice_count < src->slice_count) {
00491             int *tmp = av_realloc(dst->slice_offset, src->slice_count *
00492                                   sizeof(*dst->slice_offset));
00493             if (!tmp) {
00494                 av_free(dst->slice_offset);
00495                 return AVERROR(ENOMEM);
00496             }
00497             dst->slice_offset = tmp;
00498         }
00499         memcpy(dst->slice_offset, src->slice_offset,
00500                src->slice_count * sizeof(*dst->slice_offset));
00501     }
00502     dst->slice_count = src->slice_count;
00503     return 0;
00504 #undef copy_fields
00505 }
00506 
00507 static void free_progress(AVFrame *f)
00508 {
00509     PerThreadContext *p = f->owner->thread_opaque;
00510     int *progress = f->thread_opaque;
00511 
00512     p->progress_used[(progress - p->progress[0]) / 2] = 0;
00513 }
00514 
00516 static void release_delayed_buffers(PerThreadContext *p)
00517 {
00518     FrameThreadContext *fctx = p->parent;
00519 
00520     while (p->num_released_buffers > 0) {
00521         AVFrame *f;
00522 
00523         pthread_mutex_lock(&fctx->buffer_mutex);
00524         f = &p->released_buffers[--p->num_released_buffers];
00525         free_progress(f);
00526         f->thread_opaque = NULL;
00527 
00528         f->owner->release_buffer(f->owner, f);
00529         pthread_mutex_unlock(&fctx->buffer_mutex);
00530     }
00531 }
00532 
00533 static int submit_packet(PerThreadContext *p, AVPacket *avpkt)
00534 {
00535     FrameThreadContext *fctx = p->parent;
00536     PerThreadContext *prev_thread = fctx->prev_thread;
00537     AVCodec *codec = p->avctx->codec;
00538     uint8_t *buf = p->avpkt.data;
00539 
00540     if (!avpkt->size && !(codec->capabilities & CODEC_CAP_DELAY)) return 0;
00541 
00542     pthread_mutex_lock(&p->mutex);
00543 
00544     release_delayed_buffers(p);
00545 
00546     if (prev_thread) {
00547         int err;
00548         if (prev_thread->state == STATE_SETTING_UP) {
00549             pthread_mutex_lock(&prev_thread->progress_mutex);
00550             while (prev_thread->state == STATE_SETTING_UP)
00551                 pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
00552             pthread_mutex_unlock(&prev_thread->progress_mutex);
00553         }
00554 
00555         err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
00556         if (err) {
00557             pthread_mutex_unlock(&p->mutex);
00558             return err;
00559         }
00560     }
00561 
00562     av_fast_malloc(&buf, &p->allocated_buf_size, avpkt->size + FF_INPUT_BUFFER_PADDING_SIZE);
00563     p->avpkt = *avpkt;
00564     p->avpkt.data = buf;
00565     memcpy(buf, avpkt->data, avpkt->size);
00566     memset(buf + avpkt->size, 0, FF_INPUT_BUFFER_PADDING_SIZE);
00567 
00568     p->state = STATE_SETTING_UP;
00569     pthread_cond_signal(&p->input_cond);
00570     pthread_mutex_unlock(&p->mutex);
00571 
00572     /*
00573      * If the client doesn't have a thread-safe get_buffer(),
00574      * then decoding threads call back to the main thread,
00575      * and it calls back to the client here.
00576      */
00577 
00578     if (!p->avctx->thread_safe_callbacks &&
00579          p->avctx->get_buffer != avcodec_default_get_buffer) {
00580         while (p->state != STATE_SETUP_FINISHED && p->state != STATE_INPUT_READY) {
00581             pthread_mutex_lock(&p->progress_mutex);
00582             while (p->state == STATE_SETTING_UP)
00583                 pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00584 
00585             if (p->state == STATE_GET_BUFFER) {
00586                 p->result = p->avctx->get_buffer(p->avctx, p->requested_frame);
00587                 p->state  = STATE_SETTING_UP;
00588                 pthread_cond_signal(&p->progress_cond);
00589             }
00590             pthread_mutex_unlock(&p->progress_mutex);
00591         }
00592     }
00593 
00594     fctx->prev_thread = p;
00595     fctx->next_decoding++;
00596 
00597     return 0;
00598 }
00599 
00600 int ff_thread_decode_frame(AVCodecContext *avctx,
00601                            AVFrame *picture, int *got_picture_ptr,
00602                            AVPacket *avpkt)
00603 {
00604     FrameThreadContext *fctx = avctx->thread_opaque;
00605     int finished = fctx->next_finished;
00606     PerThreadContext *p;
00607     int err;
00608 
00609     /*
00610      * Submit a packet to the next decoding thread.
00611      */
00612 
00613     p = &fctx->threads[fctx->next_decoding];
00614     err = update_context_from_user(p->avctx, avctx);
00615     if (err) return err;
00616     err = submit_packet(p, avpkt);
00617     if (err) return err;
00618 
00619     /*
00620      * If we're still receiving the initial packets, don't return a frame.
00621      */
00622 
00623     if (fctx->delaying && avpkt->size) {
00624         if (fctx->next_decoding >= (avctx->thread_count-1)) fctx->delaying = 0;
00625 
00626         *got_picture_ptr=0;
00627         return avpkt->size;
00628     }
00629 
00630     /*
00631      * Return the next available frame from the oldest thread.
00632      * If we're at the end of the stream, then we have to skip threads that
00633      * didn't output a frame, because we don't want to accidentally signal
00634      * EOF (avpkt->size == 0 && *got_picture_ptr == 0).
00635      */
00636 
00637     do {
00638         p = &fctx->threads[finished++];
00639 
00640         if (p->state != STATE_INPUT_READY) {
00641             pthread_mutex_lock(&p->progress_mutex);
00642             while (p->state != STATE_INPUT_READY)
00643                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
00644             pthread_mutex_unlock(&p->progress_mutex);
00645         }
00646 
00647         *picture = p->frame;
00648         *got_picture_ptr = p->got_frame;
00649         picture->pkt_dts = p->avpkt.dts;
00650         picture->sample_aspect_ratio = avctx->sample_aspect_ratio;
00651         picture->width  = avctx->width;
00652         picture->height = avctx->height;
00653         picture->format = avctx->pix_fmt;
00654 
00655         /*
00656          * A later call with avkpt->size == 0 may loop over all threads,
00657          * including this one, searching for a frame to return before being
00658          * stopped by the "finished != fctx->next_finished" condition.
00659          * Make sure we don't mistakenly return the same frame again.
00660          */
00661         p->got_frame = 0;
00662 
00663         if (finished >= avctx->thread_count) finished = 0;
00664     } while (!avpkt->size && !*got_picture_ptr && finished != fctx->next_finished);
00665 
00666     update_context_from_thread(avctx, p->avctx, 1);
00667 
00668     if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
00669 
00670     fctx->next_finished = finished;
00671 
00672     /* return the size of the consumed packet if no error occurred */
00673     return (p->result >= 0) ? avpkt->size : p->result;
00674 }
00675 
00676 void ff_thread_report_progress(AVFrame *f, int n, int field)
00677 {
00678     PerThreadContext *p;
00679     int *progress = f->thread_opaque;
00680 
00681     if (!progress || progress[field] >= n) return;
00682 
00683     p = f->owner->thread_opaque;
00684 
00685     if (f->owner->debug&FF_DEBUG_THREADS)
00686         av_log(f->owner, AV_LOG_DEBUG, "%p finished %d field %d\n", progress, n, field);
00687 
00688     pthread_mutex_lock(&p->progress_mutex);
00689     progress[field] = n;
00690     pthread_cond_broadcast(&p->progress_cond);
00691     pthread_mutex_unlock(&p->progress_mutex);
00692 }
00693 
00694 void ff_thread_await_progress(AVFrame *f, int n, int field)
00695 {
00696     PerThreadContext *p;
00697     int *progress = f->thread_opaque;
00698 
00699     if (!progress || progress[field] >= n) return;
00700 
00701     p = f->owner->thread_opaque;
00702 
00703     if (f->owner->debug&FF_DEBUG_THREADS)
00704         av_log(f->owner, AV_LOG_DEBUG, "thread awaiting %d field %d from %p\n", n, field, progress);
00705 
00706     pthread_mutex_lock(&p->progress_mutex);
00707     while (progress[field] < n)
00708         pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00709     pthread_mutex_unlock(&p->progress_mutex);
00710 }
00711 
00712 void ff_thread_finish_setup(AVCodecContext *avctx) {
00713     PerThreadContext *p = avctx->thread_opaque;
00714 
00715     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
00716 
00717     if(p->state == STATE_SETUP_FINISHED){
00718         av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n");
00719     }
00720 
00721     pthread_mutex_lock(&p->progress_mutex);
00722     p->state = STATE_SETUP_FINISHED;
00723     pthread_cond_broadcast(&p->progress_cond);
00724     pthread_mutex_unlock(&p->progress_mutex);
00725 }
00726 
00728 static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
00729 {
00730     int i;
00731 
00732     for (i = 0; i < thread_count; i++) {
00733         PerThreadContext *p = &fctx->threads[i];
00734 
00735         if (p->state != STATE_INPUT_READY) {
00736             pthread_mutex_lock(&p->progress_mutex);
00737             while (p->state != STATE_INPUT_READY)
00738                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
00739             pthread_mutex_unlock(&p->progress_mutex);
00740         }
00741         p->got_frame = 0;
00742     }
00743 }
00744 
00745 static void frame_thread_free(AVCodecContext *avctx, int thread_count)
00746 {
00747     FrameThreadContext *fctx = avctx->thread_opaque;
00748     AVCodec *codec = avctx->codec;
00749     int i;
00750 
00751     park_frame_worker_threads(fctx, thread_count);
00752 
00753     if (fctx->prev_thread && fctx->prev_thread != fctx->threads)
00754         update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0);
00755 
00756     fctx->die = 1;
00757 
00758     for (i = 0; i < thread_count; i++) {
00759         PerThreadContext *p = &fctx->threads[i];
00760 
00761         pthread_mutex_lock(&p->mutex);
00762         pthread_cond_signal(&p->input_cond);
00763         pthread_mutex_unlock(&p->mutex);
00764 
00765         if (p->thread_init)
00766             pthread_join(p->thread, NULL);
00767         p->thread_init=0;
00768 
00769         if (codec->close)
00770             codec->close(p->avctx);
00771 
00772         avctx->codec = NULL;
00773 
00774         release_delayed_buffers(p);
00775     }
00776 
00777     for (i = 0; i < thread_count; i++) {
00778         PerThreadContext *p = &fctx->threads[i];
00779 
00780         avcodec_default_free_buffers(p->avctx);
00781 
00782         pthread_mutex_destroy(&p->mutex);
00783         pthread_mutex_destroy(&p->progress_mutex);
00784         pthread_cond_destroy(&p->input_cond);
00785         pthread_cond_destroy(&p->progress_cond);
00786         pthread_cond_destroy(&p->output_cond);
00787         av_freep(&p->avpkt.data);
00788 
00789         if (i) {
00790             av_freep(&p->avctx->priv_data);
00791             av_freep(&p->avctx->internal);
00792             av_freep(&p->avctx->slice_offset);
00793         }
00794 
00795         av_freep(&p->avctx);
00796     }
00797 
00798     av_freep(&fctx->threads);
00799     pthread_mutex_destroy(&fctx->buffer_mutex);
00800     av_freep(&avctx->thread_opaque);
00801 }
00802 
00803 static int frame_thread_init(AVCodecContext *avctx)
00804 {
00805     int thread_count = avctx->thread_count;
00806     AVCodec *codec = avctx->codec;
00807     AVCodecContext *src = avctx;
00808     FrameThreadContext *fctx;
00809     int i, err = 0;
00810 
00811     if (!thread_count) {
00812         int nb_cpus = get_logical_cpus(avctx);
00813         if ((avctx->debug & (FF_DEBUG_VIS_QP | FF_DEBUG_VIS_MB_TYPE)) || avctx->debug_mv)
00814             nb_cpus = 1;
00815         // use number of cores + 1 as thread count if there is more than one
00816         if (nb_cpus > 1)
00817             thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
00818         else
00819             thread_count = avctx->thread_count = 1;
00820     }
00821 
00822     if (thread_count <= 1) {
00823         avctx->active_thread_type = 0;
00824         return 0;
00825     }
00826 
00827     avctx->thread_opaque = fctx = av_mallocz(sizeof(FrameThreadContext));
00828 
00829     fctx->threads = av_mallocz(sizeof(PerThreadContext) * thread_count);
00830     pthread_mutex_init(&fctx->buffer_mutex, NULL);
00831     fctx->delaying = 1;
00832 
00833     for (i = 0; i < thread_count; i++) {
00834         AVCodecContext *copy = av_malloc(sizeof(AVCodecContext));
00835         PerThreadContext *p  = &fctx->threads[i];
00836 
00837         pthread_mutex_init(&p->mutex, NULL);
00838         pthread_mutex_init(&p->progress_mutex, NULL);
00839         pthread_cond_init(&p->input_cond, NULL);
00840         pthread_cond_init(&p->progress_cond, NULL);
00841         pthread_cond_init(&p->output_cond, NULL);
00842 
00843         p->parent = fctx;
00844         p->avctx  = copy;
00845 
00846         if (!copy) {
00847             err = AVERROR(ENOMEM);
00848             goto error;
00849         }
00850 
00851         *copy = *src;
00852         copy->thread_opaque = p;
00853         copy->pkt = &p->avpkt;
00854 
00855         if (!i) {
00856             src = copy;
00857 
00858             if (codec->init)
00859                 err = codec->init(copy);
00860 
00861             update_context_from_thread(avctx, copy, 1);
00862         } else {
00863             copy->priv_data = av_malloc(codec->priv_data_size);
00864             if (!copy->priv_data) {
00865                 err = AVERROR(ENOMEM);
00866                 goto error;
00867             }
00868             memcpy(copy->priv_data, src->priv_data, codec->priv_data_size);
00869             copy->internal = av_malloc(sizeof(AVCodecInternal));
00870             if (!copy->internal) {
00871                 err = AVERROR(ENOMEM);
00872                 goto error;
00873             }
00874             *copy->internal = *src->internal;
00875             copy->internal->is_copy = 1;
00876 
00877             if (codec->init_thread_copy)
00878                 err = codec->init_thread_copy(copy);
00879         }
00880 
00881         if (err) goto error;
00882 
00883         p->thread_init= !pthread_create(&p->thread, NULL, frame_worker_thread, p);
00884         if(!p->thread_init)
00885             goto error;
00886     }
00887 
00888     return 0;
00889 
00890 error:
00891     frame_thread_free(avctx, i+1);
00892 
00893     return err;
00894 }
00895 
00896 void ff_thread_flush(AVCodecContext *avctx)
00897 {
00898     FrameThreadContext *fctx = avctx->thread_opaque;
00899 
00900     if (!avctx->thread_opaque) return;
00901 
00902     park_frame_worker_threads(fctx, avctx->thread_count);
00903     if (fctx->prev_thread) {
00904         if (fctx->prev_thread != &fctx->threads[0])
00905             update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
00906         if (avctx->codec->flush)
00907             avctx->codec->flush(fctx->threads[0].avctx);
00908     }
00909 
00910     fctx->next_decoding = fctx->next_finished = 0;
00911     fctx->delaying = 1;
00912     fctx->prev_thread = NULL;
00913 }
00914 
00915 static int *allocate_progress(PerThreadContext *p)
00916 {
00917     int i;
00918 
00919     for (i = 0; i < MAX_BUFFERS; i++)
00920         if (!p->progress_used[i]) break;
00921 
00922     if (i == MAX_BUFFERS) {
00923         av_log(p->avctx, AV_LOG_ERROR, "allocate_progress() overflow\n");
00924         return NULL;
00925     }
00926 
00927     p->progress_used[i] = 1;
00928 
00929     return p->progress[i];
00930 }
00931 
00932 int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f)
00933 {
00934     PerThreadContext *p = avctx->thread_opaque;
00935     int *progress, err;
00936 
00937     f->owner = avctx;
00938 
00939     ff_init_buffer_info(avctx, f);
00940 
00941     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) {
00942         f->thread_opaque = NULL;
00943         return avctx->get_buffer(avctx, f);
00944     }
00945 
00946     if (p->state != STATE_SETTING_UP &&
00947         (avctx->codec->update_thread_context || (!avctx->thread_safe_callbacks &&
00948                 avctx->get_buffer != avcodec_default_get_buffer))) {
00949         av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
00950         return -1;
00951     }
00952 
00953     pthread_mutex_lock(&p->parent->buffer_mutex);
00954     f->thread_opaque = progress = allocate_progress(p);
00955 
00956     if (!progress) {
00957         pthread_mutex_unlock(&p->parent->buffer_mutex);
00958         return -1;
00959     }
00960 
00961     progress[0] =
00962     progress[1] = -1;
00963 
00964     if (avctx->thread_safe_callbacks ||
00965         avctx->get_buffer == avcodec_default_get_buffer) {
00966         err = avctx->get_buffer(avctx, f);
00967     } else {
00968         p->requested_frame = f;
00969         p->state = STATE_GET_BUFFER;
00970         pthread_mutex_lock(&p->progress_mutex);
00971         pthread_cond_broadcast(&p->progress_cond);
00972 
00973         while (p->state != STATE_SETTING_UP)
00974             pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00975 
00976         err = p->result;
00977 
00978         pthread_mutex_unlock(&p->progress_mutex);
00979 
00980         if (!avctx->codec->update_thread_context)
00981             ff_thread_finish_setup(avctx);
00982     }
00983 
00984     pthread_mutex_unlock(&p->parent->buffer_mutex);
00985 
00986     return err;
00987 }
00988 
00989 void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f)
00990 {
00991     PerThreadContext *p = avctx->thread_opaque;
00992     FrameThreadContext *fctx;
00993 
00994     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) {
00995         avctx->release_buffer(avctx, f);
00996         return;
00997     }
00998 
00999     if (p->num_released_buffers >= MAX_BUFFERS) {
01000         av_log(p->avctx, AV_LOG_ERROR, "too many thread_release_buffer calls!\n");
01001         return;
01002     }
01003 
01004     if(avctx->debug & FF_DEBUG_BUFFERS)
01005         av_log(avctx, AV_LOG_DEBUG, "thread_release_buffer called on pic %p\n", f);
01006 
01007     fctx = p->parent;
01008     pthread_mutex_lock(&fctx->buffer_mutex);
01009     p->released_buffers[p->num_released_buffers++] = *f;
01010     pthread_mutex_unlock(&fctx->buffer_mutex);
01011     memset(f->data, 0, sizeof(f->data));
01012 }
01013 
01023 static void validate_thread_parameters(AVCodecContext *avctx)
01024 {
01025     int frame_threading_supported = (avctx->codec->capabilities & CODEC_CAP_FRAME_THREADS)
01026                                 && !(avctx->flags & CODEC_FLAG_TRUNCATED)
01027                                 && !(avctx->flags & CODEC_FLAG_LOW_DELAY)
01028                                 && !(avctx->flags2 & CODEC_FLAG2_CHUNKS);
01029     if (avctx->thread_count == 1) {
01030         avctx->active_thread_type = 0;
01031     } else if (frame_threading_supported && (avctx->thread_type & FF_THREAD_FRAME)) {
01032         avctx->active_thread_type = FF_THREAD_FRAME;
01033     } else if (avctx->codec->capabilities & CODEC_CAP_SLICE_THREADS &&
01034                avctx->thread_type & FF_THREAD_SLICE) {
01035         avctx->active_thread_type = FF_THREAD_SLICE;
01036     } else if (!(avctx->codec->capabilities & CODEC_CAP_AUTO_THREADS)) {
01037         avctx->thread_count       = 1;
01038         avctx->active_thread_type = 0;
01039     }
01040 }
01041 
01042 int ff_thread_init(AVCodecContext *avctx)
01043 {
01044     if (avctx->thread_opaque) {
01045         av_log(avctx, AV_LOG_ERROR, "avcodec_thread_init is ignored after avcodec_open\n");
01046         return -1;
01047     }
01048 
01049 #if HAVE_W32THREADS
01050     w32thread_init();
01051 #endif
01052 
01053     if (avctx->codec) {
01054         validate_thread_parameters(avctx);
01055 
01056         if (avctx->active_thread_type&FF_THREAD_SLICE)
01057             return thread_init(avctx);
01058         else if (avctx->active_thread_type&FF_THREAD_FRAME)
01059             return frame_thread_init(avctx);
01060     }
01061 
01062     return 0;
01063 }
01064 
01065 void ff_thread_free(AVCodecContext *avctx)
01066 {
01067     if (avctx->active_thread_type&FF_THREAD_FRAME)
01068         frame_thread_free(avctx, avctx->thread_count);
01069     else
01070         thread_free(avctx);
01071 }