asfdemux: fix performance issue, especially with high-bitrate streams
[glsdk/gst-plugins-ugly0-10.git] / gst / asfdemux / asfpacket.c
1 /* GStreamer ASF/WMV/WMA demuxer
2  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
20 /* FIXME:
21  *  file:///home/tpm/samples/video/asf//336370-regis-velo862.wmv
22  *  file:///home/tpm/samples/video/asf//336370-eichhoer.wmv
23  * throw errors (not always necessarily) in this code path
24  * (looks like they carry broken payloads/packets though) */
26 #include "asfpacket.h"
28 #include <gst/gstutils.h>
29 #include <gst/gstinfo.h>
30 #include <string.h>
32 /* we are unlikely to deal with lengths > 2GB here any time soon, so just
33  * return a signed int and use that for error reporting */
34 static inline gint
35 asf_packet_read_varlen_int (guint lentype_flags, guint lentype_bit_offset,
36     const guint8 ** p_data, guint * p_size)
37 {
38   static const guint lens[4] = { 0, 1, 2, 4 };
39   guint len, val;
41   len = lens[(lentype_flags >> lentype_bit_offset) & 0x03];
43   /* will make caller bail out with a short read if there's not enough data */
44   if (G_UNLIKELY (*p_size < len)) {
45     GST_WARNING ("need %u bytes, but only %u bytes available", len, *p_size);
46     return -1;
47   }
49   switch (len) {
50     case 0:
51       val = 0;
52       break;
53     case 1:
54       val = GST_READ_UINT8 (*p_data);
55       break;
56     case 2:
57       val = GST_READ_UINT16_LE (*p_data);
58       break;
59     case 4:
60       val = GST_READ_UINT32_LE (*p_data);
61       break;
62     default:
63       g_assert_not_reached ();
64   }
66   *p_data += len;
67   *p_size -= len;
69   return (gint) val;
70 }
72 static GstBuffer *
73 asf_packet_create_payload_buffer (AsfPacket * packet, const guint8 ** p_data,
74     guint * p_size, guint payload_len)
75 {
76   guint off;
78   g_assert (payload_len <= *p_size);
80   off = (guint) (*p_data - GST_BUFFER_DATA (packet->buf));
81   g_assert (off < GST_BUFFER_SIZE (packet->buf));
83   *p_data += payload_len;
84   *p_size -= payload_len;
86   return gst_buffer_create_sub (packet->buf, off, payload_len);
87 }
89 static AsfPayload *
90 asf_payload_find_previous_fragment (AsfPayload * payload, AsfStream * stream)
91 {
92   AsfPayload *ret;
94   if (G_UNLIKELY (stream->payloads->len == 0)) {
95     GST_DEBUG ("No previous fragments to merge with for stream %u", stream->id);
96     return NULL;
97   }
99   ret =
100       &g_array_index (stream->payloads, AsfPayload, stream->payloads->len - 1);
102   if (G_UNLIKELY (ret->mo_size != payload->mo_size ||
103           ret->mo_number != payload->mo_number || ret->mo_offset != 0)) {
104     if (payload->mo_size != 0) {
105       GST_WARNING ("Previous fragment does not match continued fragment");
106       return NULL;
107     } else {
108       /* Warn about this case, but accept it anyway: files in the wild sometimes
109        * have continued packets where the subsequent fragments say that they're
110        * zero-sized. */
111       GST_WARNING ("Previous fragment found, but current fragment has "
112           "zero size, accepting anyway");
113     }
114   }
115 #if 0
116   if (this_fragment->mo_offset + this_payload_len > first_fragment->mo_size) {
117     GST_WARNING ("Merged fragments would be bigger than the media object");
118     return FALSE;
119   }
120 #endif
122   return ret;
125 /* TODO: if we have another payload already queued for this stream and that
126  * payload doesn't have a duration, maybe we can calculate a duration for it
127  * (if the previous timestamp is smaller etc. etc.) */
128 static void
129 gst_asf_payload_queue_for_stream (GstASFDemux * demux, AsfPayload * payload,
130     AsfStream * stream)
132   GST_DEBUG_OBJECT (demux, "Got payload for stream %d ts:%" GST_TIME_FORMAT,
133       stream->id, GST_TIME_ARGS (payload->ts));
134   /* remember the first timestamp in the stream */
135   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (demux->first_ts) &&
136           GST_CLOCK_TIME_IS_VALID (payload->ts))) {
137     GST_DEBUG_OBJECT (demux, "first ts: %" GST_TIME_FORMAT,
138         GST_TIME_ARGS (payload->ts));
139     demux->first_ts = payload->ts;
140   }
142   /* make timestamps start from 0 */
143   if (G_LIKELY (demux->first_ts < payload->ts))
144     payload->ts -= demux->first_ts;
145   else
146     payload->ts = 0;
148   /* remove any incomplete payloads that will never be completed */
149   while (stream->payloads->len > 0) {
150     AsfPayload *prev;
151     guint idx_last;
153     idx_last = stream->payloads->len - 1;
154     prev = &g_array_index (stream->payloads, AsfPayload, idx_last);
156     if (G_UNLIKELY (gst_asf_payload_is_complete (prev)))
157       break;
159     GST_DEBUG_OBJECT (demux, "Dropping incomplete fragmented media object "
160         "queued for stream %u", stream->id);
162     gst_buffer_replace (&prev->buf, NULL);
163     g_array_remove_index (stream->payloads, idx_last);
165     /* there's data missing, so there's a discontinuity now */
166     GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
167   }
169   /* If we're about to queue a key frame that is before the segment start, we
170    * can ditch any previously queued payloads (which would also be before the
171    * segment start). This makes sure the decoder doesn't decode more than
172    * absolutely necessary after a seek (we don't push out payloads that are
173    * before the segment start until we have at least one that falls within the
174    * segment) */
175   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (payload->ts) &&
176           payload->ts < demux->segment.start && payload->keyframe)) {
177     GST_DEBUG_OBJECT (demux, "Queueing keyframe before segment start, removing"
178         " %u previously-queued payloads, which would be out of segment too and"
179         " hence don't have to be decoded", stream->payloads->len);
180     while (stream->payloads->len > 0) {
181       AsfPayload *last;
182       guint idx_last;
184       idx_last = stream->payloads->len - 1;
185       last = &g_array_index (stream->payloads, AsfPayload, idx_last);
186       gst_buffer_replace (&last->buf, NULL);
187       g_array_remove_index (stream->payloads, idx_last);
188     }
190     /* Mark discontinuity (should be done via stream->discont anyway though) */
191     GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
192   }
194   /* remember the first queued timestamp for the segment */
195   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (demux->segment_ts) &&
196           GST_CLOCK_TIME_IS_VALID (payload->ts))) {
197     GST_DEBUG_OBJECT (demux, "segment ts: %" GST_TIME_FORMAT,
198         GST_TIME_ARGS (payload->ts));
199     demux->segment_ts = payload->ts;
200     /* always note, but only determines segment when streaming */
201     if (demux->streaming)
202       gst_segment_set_seek (&demux->segment, demux->in_segment.rate,
203           GST_FORMAT_TIME, demux->segment.flags, GST_SEEK_TYPE_SET,
204           demux->segment_ts, GST_SEEK_TYPE_NONE, 0, NULL);
205   }
207   g_array_append_vals (stream->payloads, payload, 1);
210 static void
211 asf_payload_parse_replicated_data_extensions (AsfStream * stream,
212     AsfPayload * payload)
214   AsfPayloadExtension *ext;
215   guint off;
217   if (!stream->ext_props.valid || stream->ext_props.payload_extensions == NULL)
218     return;
220   off = 8;
221   for (ext = stream->ext_props.payload_extensions; ext->len > 0; ++ext) {
222     if (G_UNLIKELY (off + ext->len > payload->rep_data_len)) {
223       GST_WARNING ("not enough replicated data for defined extensions");
224       return;
225     }
226     switch (ext->id) {
227       case ASF_PAYLOAD_EXTENSION_DURATION:
228         if (G_LIKELY (ext->len == 2)) {
229           guint16 tdur = GST_READ_UINT16_LE (payload->rep_data + off);
230           /* packet durations of 1ms are mostly invalid */
231           if (tdur != 1)
232             payload->duration = tdur * GST_MSECOND;
233         } else {
234           GST_WARNING ("unexpected DURATION extensions len %u", ext->len);
235         }
236         break;
237       case ASF_PAYLOAD_EXTENSION_SYSTEM_CONTENT:
238         if (G_LIKELY (ext->len == 1)) {
239           guint8 data = payload->rep_data[off];
241           payload->interlaced = data & 0x1;
242           payload->rff = data & 0x8;
243           payload->tff = (data & 0x2) || !(data & 0x4);
244           GST_DEBUG ("SYSTEM_CONTENT: interlaced:%d, rff:%d, tff:%d",
245               payload->interlaced, payload->rff, payload->tff);
246         } else {
247           GST_WARNING ("unexpected SYSTEM_CONTE extensions len %u", ext->len);
248         }
249         break;
250       case ASF_PAYLOAD_EXTENSION_SYSTEM_PIXEL_ASPECT_RATIO:
251         if (G_LIKELY (ext->len == 2)) {
252           payload->par_x = payload->rep_data[off];
253           payload->par_y = payload->rep_data[off + 1];
254           GST_DEBUG ("PAR %d / %d", payload->par_x, payload->par_y);
255         } else {
256           GST_WARNING ("unexpected SYSTEM_PIXEL_ASPECT_RATIO extensions len %u",
257               ext->len);
258         }
259         break;
260       default:
261         GST_WARNING ("UNKNOWN PAYLOAD EXTENSION !");
262         break;
263     }
264     off += ext->len;
265   }
268 static gboolean
269 gst_asf_demux_parse_payload (GstASFDemux * demux, AsfPacket * packet,
270     gint lentype, const guint8 ** p_data, guint * p_size)
272   AsfPayload payload = { 0, };
273   AsfStream *stream;
274   gboolean is_compressed;
275   guint payload_len;
276   guint stream_num;
278   if (G_UNLIKELY (*p_size < 1)) {
279     GST_WARNING_OBJECT (demux, "Short packet!");
280     return FALSE;
281   }
283   stream_num = GST_READ_UINT8 (*p_data) & 0x7f;
284   payload.keyframe = ((GST_READ_UINT8 (*p_data) & 0x80) != 0);
286   *p_data += 1;
287   *p_size -= 1;
289   payload.ts = GST_CLOCK_TIME_NONE;
290   payload.duration = GST_CLOCK_TIME_NONE;
291   payload.par_x = 0;
292   payload.par_y = 0;
293   payload.interlaced = FALSE;
294   payload.tff = FALSE;
295   payload.rff = FALSE;
297   payload.mo_number =
298       asf_packet_read_varlen_int (packet->prop_flags, 4, p_data, p_size);
299   payload.mo_offset =
300       asf_packet_read_varlen_int (packet->prop_flags, 2, p_data, p_size);
301   payload.rep_data_len =
302       asf_packet_read_varlen_int (packet->prop_flags, 0, p_data, p_size);
304   is_compressed = (payload.rep_data_len == 1);
306   GST_LOG_OBJECT (demux, "payload for stream %u", stream_num);
307   GST_LOG_OBJECT (demux, "keyframe   : %s", (payload.keyframe) ? "yes" : "no");
308   GST_LOG_OBJECT (demux, "compressed : %s", (is_compressed) ? "yes" : "no");
310   if (G_UNLIKELY (*p_size < payload.rep_data_len)) {
311     GST_WARNING_OBJECT (demux, "Short packet! rep_data_len=%u, size=%u",
312         payload.rep_data_len, *p_size);
313     return FALSE;
314   }
316   memcpy (payload.rep_data, *p_data,
317       MIN (sizeof (payload.rep_data), payload.rep_data_len));
319   *p_data += payload.rep_data_len;
320   *p_size -= payload.rep_data_len;
322   if (G_UNLIKELY (*p_size == 0)) {
323     GST_WARNING_OBJECT (demux, "payload without data!?");
324     return FALSE;
325   }
327   /* we use -1 as lentype for a single payload that's the size of the packet */
328   if (G_UNLIKELY ((lentype >= 0 && lentype <= 3))) {
329     payload_len = asf_packet_read_varlen_int (lentype, 0, p_data, p_size);
330     if (*p_size < payload_len) {
331       GST_WARNING_OBJECT (demux, "Short packet! payload_len=%u, size=%u",
332           payload_len, *p_size);
333       return FALSE;
334     }
335   } else {
336     payload_len = *p_size;
337   }
339   GST_LOG_OBJECT (demux, "payload length: %u", payload_len);
341   stream = gst_asf_demux_get_stream (demux, stream_num);
343   if (G_UNLIKELY (stream == NULL)) {
344     GST_WARNING_OBJECT (demux, "Payload for unknown stream %u, skipping",
345         stream_num);
346     if (*p_size < payload_len) {
347       *p_data += *p_size;
348       *p_size = 0;
349     } else {
350       *p_data += payload_len;
351       *p_size -= payload_len;
352     }
353     return TRUE;
354   }
356   if (G_UNLIKELY (!is_compressed)) {
357     GST_LOG_OBJECT (demux, "replicated data length: %u", payload.rep_data_len);
359     if (payload.rep_data_len >= 8) {
360       payload.mo_size = GST_READ_UINT32_LE (payload.rep_data);
361       payload.ts = GST_READ_UINT32_LE (payload.rep_data + 4) * GST_MSECOND;
362       if (G_UNLIKELY (payload.ts < demux->preroll))
363         payload.ts = 0;
364       else
365         payload.ts -= demux->preroll;
366       asf_payload_parse_replicated_data_extensions (stream, &payload);
368       GST_LOG_OBJECT (demux, "media object size   : %u", payload.mo_size);
369       GST_LOG_OBJECT (demux, "media object ts     : %" GST_TIME_FORMAT,
370           GST_TIME_ARGS (payload.ts));
371       GST_LOG_OBJECT (demux, "media object dur    : %" GST_TIME_FORMAT,
372           GST_TIME_ARGS (payload.duration));
373     } else if (payload.rep_data_len != 0) {
374       GST_WARNING_OBJECT (demux, "invalid replicated data length, very bad");
375       *p_data += payload_len;
376       *p_size -= payload_len;
377       return FALSE;
378     }
380     GST_LOG_OBJECT (demux, "media object offset : %u", payload.mo_offset);
382     GST_LOG_OBJECT (demux, "payload length: %u", payload_len);
384     if (payload_len == 0) {
385       GST_DEBUG_OBJECT (demux, "skipping empty payload");
386     } else if (payload.mo_offset == 0 && payload.mo_size == payload_len) {
387       /* if the media object is not fragmented, just create a sub-buffer */
388       GST_LOG_OBJECT (demux, "unfragmented media object size %u", payload_len);
389       payload.buf = asf_packet_create_payload_buffer (packet, p_data, p_size,
390           payload_len);
391       payload.buf_filled = payload_len;
392       gst_asf_payload_queue_for_stream (demux, &payload, stream);
393     } else {
394       const guint8 *payload_data = *p_data;
396       g_assert (payload_len <= *p_size);
398       *p_data += payload_len;
399       *p_size -= payload_len;
401       /* n-th fragment of a fragmented media object? */
402       if (payload.mo_offset != 0) {
403         AsfPayload *prev;
405         if ((prev = asf_payload_find_previous_fragment (&payload, stream))) {
406           if (prev->buf == NULL || payload.mo_size != prev->mo_size ||
407               payload.mo_offset >= GST_BUFFER_SIZE (prev->buf) ||
408               payload.mo_offset + payload_len > GST_BUFFER_SIZE (prev->buf)) {
409             GST_WARNING_OBJECT (demux, "Offset doesn't match previous data?!");
410           } else {
411             /* we assume fragments are payloaded with increasing mo_offset */
412             if (payload.mo_offset != prev->buf_filled) {
413               GST_WARNING_OBJECT (demux, "media object payload discontinuity: "
414                   "offset=%u vs buf_filled=%u", payload.mo_offset,
415                   prev->buf_filled);
416             }
417             memcpy (GST_BUFFER_DATA (prev->buf) + payload.mo_offset,
418                 payload_data, payload_len);
419             prev->buf_filled =
420                 MAX (prev->buf_filled, payload.mo_offset + payload_len);
421             GST_LOG_OBJECT (demux, "Merged media object fragments, size now %u",
422                 prev->buf_filled);
423           }
424         } else {
425           GST_DEBUG_OBJECT (demux, "n-th payload fragment, but don't have "
426               "any previous fragment, ignoring payload");
427         }
428       } else {
429         GST_LOG_OBJECT (demux, "allocating buffer of size %u for fragmented "
430             "media object", payload.mo_size);
431         payload.buf = gst_buffer_new_and_alloc (payload.mo_size);
432         memcpy (GST_BUFFER_DATA (payload.buf), payload_data, payload_len);
433         payload.buf_filled = payload_len;
435         gst_asf_payload_queue_for_stream (demux, &payload, stream);
436       }
437     }
438   } else {
439     const guint8 *payload_data;
440     GstClockTime ts, ts_delta;
441     guint num;
443     GST_LOG_OBJECT (demux, "Compressed payload, length=%u", payload_len);
445     payload_data = *p_data;
447     *p_data += payload_len;
448     *p_size -= payload_len;
450     ts = payload.mo_offset * GST_MSECOND;
451     if (G_UNLIKELY (ts < demux->preroll))
452       ts = 0;
453     else
454       ts -= demux->preroll;
455     ts_delta = payload.rep_data[0] * GST_MSECOND;
457     for (num = 0; payload_len > 0; ++num) {
458       guint sub_payload_len;
460       sub_payload_len = GST_READ_UINT8 (payload_data);
462       GST_LOG_OBJECT (demux, "subpayload #%u: len=%u, ts=%" GST_TIME_FORMAT,
463           num, sub_payload_len, GST_TIME_ARGS (ts));
465       ++payload_data;
466       --payload_len;
468       if (G_UNLIKELY (payload_len < sub_payload_len)) {
469         GST_WARNING_OBJECT (demux, "Short payload! %u bytes left", payload_len);
470         return FALSE;
471       }
473       if (G_LIKELY (sub_payload_len > 0)) {
474         payload.buf = asf_packet_create_payload_buffer (packet,
475             &payload_data, &payload_len, sub_payload_len);
476         payload.buf_filled = sub_payload_len;
478         payload.ts = ts;
479         if (G_LIKELY (ts_delta))
480           payload.duration = ts_delta;
481         else
482           payload.duration = GST_CLOCK_TIME_NONE;
484         gst_asf_payload_queue_for_stream (demux, &payload, stream);
485       }
487       ts += ts_delta;
488     }
489   }
491   return TRUE;
494 gboolean
495 gst_asf_demux_parse_packet (GstASFDemux * demux, GstBuffer * buf)
497   AsfPacket packet = { 0, };
498   const guint8 *data;
499   gboolean has_multiple_payloads;
500   gboolean ret = TRUE;
501   guint8 ec_flags, flags1;
502   guint size;
504   data = GST_BUFFER_DATA (buf);
505   size = GST_BUFFER_SIZE (buf);
506   GST_LOG_OBJECT (demux, "Buffer size: %u", size);
508   /* need at least two payload flag bytes, send time, and duration */
509   if (G_UNLIKELY (size < 2 + 4 + 2))
510     goto short_packet;
512   packet.buf = buf;
514   ec_flags = GST_READ_UINT8 (data);
516   /* skip optional error correction stuff */
517   if ((ec_flags & 0x80) != 0) {
518     guint ec_len_type, ec_len;
520     ec_len_type = (ec_flags & 0x60) >> 5;
521     if (ec_len_type == 0) {
522       ec_len = ec_flags & 0x0f;
523     } else {
524       GST_WARNING_OBJECT (demux, "unexpected error correction length type %u",
525           ec_len_type);
526       ec_len = 2;
527     }
528     GST_LOG_OBJECT (demux, "packet has error correction (%u bytes)", ec_len);
530     /* still need at least two payload flag bytes, send time, and duration */
531     if (size <= (1 + ec_len) + 2 + 4 + 2)
532       goto short_packet;
534     data += 1 + ec_len;
535     size -= 1 + ec_len;
536   }
538   /* parse payload info */
539   flags1 = GST_READ_UINT8 (data);
540   packet.prop_flags = GST_READ_UINT8 (data + 1);
542   data += 2;
543   size -= 2;
545   has_multiple_payloads = (flags1 & 0x01) != 0;
547   packet.length = asf_packet_read_varlen_int (flags1, 5, &data, &size);
549   packet.sequence = asf_packet_read_varlen_int (flags1, 1, &data, &size);
551   packet.padding = asf_packet_read_varlen_int (flags1, 3, &data, &size);
553   if (G_UNLIKELY (size < 6))
554     goto short_packet;
556   packet.send_time = GST_READ_UINT32_LE (data) * GST_MSECOND;
557   packet.duration = GST_READ_UINT16_LE (data + 4) * GST_MSECOND;
559   data += 4 + 2;
560   size -= 4 + 2;
562   GST_LOG_OBJECT (demux, "flags            : 0x%x", flags1);
563   GST_LOG_OBJECT (demux, "multiple payloads: %u", has_multiple_payloads);
564   GST_LOG_OBJECT (demux, "packet length    : %u", packet.length);
565   GST_LOG_OBJECT (demux, "sequence         : %u", packet.sequence);
566   GST_LOG_OBJECT (demux, "padding          : %u", packet.padding);
567   GST_LOG_OBJECT (demux, "send time        : %" GST_TIME_FORMAT,
568       GST_TIME_ARGS (packet.send_time));
569   GST_LOG_OBJECT (demux, "duration         : %" GST_TIME_FORMAT,
570       GST_TIME_ARGS (packet.duration));
572   if (G_UNLIKELY (packet.padding == (guint) - 1 || size < packet.padding))
573     goto short_packet;
575   size -= packet.padding;
577   /* adjust available size for parsing if there's less actual packet data for
578    * parsing than there is data in bytes (for sample see bug 431318) */
579   if (G_UNLIKELY (packet.length != 0 && packet.padding == 0
580           && packet.length < demux->packet_size)) {
581     GST_LOG_OBJECT (demux, "shortened packet with implicit padding, "
582         "adjusting available data size");
583     if (size < demux->packet_size - packet.length) {
584       /* the buffer is smaller than the implicit padding */
585       goto short_packet;
586     } else {
587       /* subtract the implicit padding */
588       size -= (demux->packet_size - packet.length);
589     }
590   }
592   if (has_multiple_payloads) {
593     guint i, num, lentype;
595     if (G_UNLIKELY (size < 1))
596       goto short_packet;
598     num = (GST_READ_UINT8 (data) & 0x3F) >> 0;
599     lentype = (GST_READ_UINT8 (data) & 0xC0) >> 6;
601     ++data;
602     --size;
604     GST_LOG_OBJECT (demux, "num payloads     : %u", num);
606     for (i = 0; i < num; ++i) {
607       GST_LOG_OBJECT (demux, "Parsing payload %u/%u, size left: %u", i + 1, num,
608           size);
610       ret = gst_asf_demux_parse_payload (demux, &packet, lentype, &data, &size);
612       if (G_UNLIKELY (!ret)) {
613         GST_WARNING_OBJECT (demux, "Failed to parse payload %u/%u", i + 1, num);
614         break;
615       }
616     }
617   } else {
618     GST_LOG_OBJECT (demux, "Parsing single payload");
619     ret = gst_asf_demux_parse_payload (demux, &packet, -1, &data, &size);
620   }
622   return ret;
624 /* ERRORS */
625 short_packet:
626   {
627     GST_WARNING_OBJECT (demux, "Short packet!");
628     return FALSE;
629   }