Skip to content

Commit

Permalink
Ensure input stream is closed on error - addresses issue #95
Browse files Browse the repository at this point in the history
 * Add a "virtual" close function to the gstworker class, call this when error is handled
 * Add a "virtual" close function to the gsocketinputstream class, which ensures
   that the socket is closed rather than relying on unref at finalize
 * Add video caps to inputsrc pipeline so that video format errors are
   detected and handled directly
 * Add a kludge to redirect warnings that are actually errors to the correct handler
  • Loading branch information
David Nugent authored and mithro committed Jan 16, 2015
1 parent 07f0fc9 commit f3eb3eb
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 21 deletions.
24 changes: 18 additions & 6 deletions tools/gio/gsocketinputstream.c
Expand Up @@ -85,22 +85,32 @@ g_socket_input_stream_set_property (GObject * object,
}
}

static gboolean
g_socket_input_stream_close (GInputStream * s,
GCancellable * cancellable, GError ** error)
{
GSocketInputStreamX *stream = G_SOCKET_INPUT_STREAM (s);
gboolean ret = TRUE;

if (stream->priv->socket) {
ret = g_socket_close (stream->priv->socket, error);
g_object_unref (stream->priv->socket);
stream->priv->socket = NULL;
}
return ret;
}

static void
g_socket_input_stream_finalize (GObject * object)
{
GSocketInputStreamX *stream = G_SOCKET_INPUT_STREAM (object);

if (stream->priv->socket) {
GError *error = NULL;
/*
g_print ("%s:%d: %s, %d\n", __FILE__, __LINE__, __FUNCTION__,
g_socket_get_fd (stream->priv->socket));
*/
g_socket_close (stream->priv->socket, &error);
g_socket_input_stream_close (&stream->parent_instance, NULL, &error);
if (error) {
//ERROR ("%s", error->message);
}
g_object_unref (stream->priv->socket);
}

if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
Expand All @@ -117,6 +127,7 @@ g_socket_input_stream_read (GInputStream * stream,
buffer, count, TRUE, cancellable, error);
}


static void
g_socket_input_stream_class_init (GSocketInputStreamXClass * klass)
{
Expand All @@ -130,6 +141,7 @@ g_socket_input_stream_class_init (GSocketInputStreamXClass * klass)
gobject_class->set_property = g_socket_input_stream_set_property;

ginputstream_class->read_fn = g_socket_input_stream_read;
ginputstream_class->close_fn = g_socket_input_stream_close;

g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
Expand Down
25 changes: 18 additions & 7 deletions tools/gstcase.c
Expand Up @@ -90,24 +90,20 @@ gst_case_init (GstCase * cas)
}

/**
* @param cas The GstCase instance.
* @param cas the GstCase instance.
* @memberof GstCase
*
* Disposing from it's parent object.
*
* @see GObject
* Closes/releases resources used by the GstCase
*/
static void
gst_case_dispose (GstCase * cas)
gst_case_close (GstCase * cas)
{
if (cas->stream) {
#if 0
GError *error = NULL;
g_input_stream_close (cas->stream, NULL, &error);
if (error) {
ERROR ("%s", error->message);
}
#endif
g_object_unref (cas->stream);
cas->stream = NULL;
}
Expand All @@ -121,6 +117,20 @@ gst_case_dispose (GstCase * cas)
g_object_unref (cas->branch);
cas->branch = NULL;
}
}

/**
* @param cas The GstR (Case instance.
* @memberof GstCase
*
* Disposing from it's parent object.
*
* @see GObject
*/
static void
gst_case_dispose (GstCase * cas)
{
gst_case_close (cas);
//INFO ("dispose %p", cas);
G_OBJECT_CLASS (parent_class)->dispose (G_OBJECT (cas));
}
Expand Down Expand Up @@ -546,4 +556,5 @@ gst_case_class_init (GstCaseClass * klass)
worker_class->prepare = (GstWorkerPrepareFunc) gst_case_prepare;
worker_class->get_pipeline_string = (GstWorkerGetPipelineStringFunc)
gst_case_get_pipeline_string;
worker_class->close = (GstWorkerCloseFunc) gst_case_close;
}
30 changes: 22 additions & 8 deletions tools/gstworker.c
Expand Up @@ -32,6 +32,8 @@
#include "gstworker.h"
#include "gstswitchserver.h"

#include <string.h>

#define GST_WORKER_LOCK_PIPELINE(srv) (g_mutex_lock (&(srv)->pipeline_lock))
#define GST_WORKER_UNLOCK_PIPELINE(srv) (g_mutex_unlock (&(srv)->pipeline_lock))

Expand Down Expand Up @@ -400,10 +402,9 @@ gst_worker_stop_force (GstWorker * worker, gboolean force)

g_timeout_add (5,
(GSourceFunc) gst_worker_state_ready_to_null_proxy, worker);
}
else if (state == GST_STATE_PLAYING) {
} else if (state == GST_STATE_PLAYING) {
/* Send an EOS to cleanly shutdown */
gst_element_send_event (worker->pipeline, gst_event_new_eos());
gst_element_send_event (worker->pipeline, gst_event_new_eos ());

/* Go to sleep until the EOS handler calls stop_force (worker, TRUE); */
g_cond_wait (&worker->shutdown_cond, &worker->pipeline_lock);
Expand Down Expand Up @@ -457,6 +458,7 @@ static void
gst_worker_handle_eos (GstWorker * worker)
{
gst_worker_stop_force (worker, TRUE);
GST_WORKER_CLASS (G_OBJECT_GET_CLASS (worker))->close (worker);
}

static void
Expand Down Expand Up @@ -489,16 +491,20 @@ gst_worker_handle_error (GstWorker * worker, GError * error, const char *debug)
}
ERROR ("DEBUG INFO:\n%s\n", debug);

#if 0
gst_worker_stop (worker);
#endif
GstWorkerClass *worker_class = GST_WORKER_CLASS (G_OBJECT_GET_CLASS (worker));
worker_class->close (worker);
}

static void
gst_worker_handle_warning (GstWorker * worker, GError * error,
const char *debug)
{
WARN ("%s: %s (%s)", worker->name, error->message, debug);
// kludge: some gstreamer "warnings" are apparently non-recoverable errors
if (strstr (error->message, "error:") != NULL)
gst_worker_handle_error (worker, error, debug);
else
WARN ("%s: %s (%s)", worker->name, error->message, debug);
}

static void
Expand Down Expand Up @@ -609,7 +615,7 @@ gst_worker_pipeline_state_changed (GstWorker * worker,
}

static GstBusSyncReply
gst_worker_message_sync (GstBus * bus, GstMessage * message, GstWorker *worker)
gst_worker_message_sync (GstBus * bus, GstMessage * message, GstWorker * worker)
{
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
Expand Down Expand Up @@ -782,7 +788,8 @@ gst_worker_prepare_unsafe (GstWorker * worker)
if (!worker->watch)
goto error_add_watch;

gst_bus_set_sync_handler (worker->bus, (GstBusSyncHandler) (gst_worker_message_sync), worker, NULL);
gst_bus_set_sync_handler (worker->bus,
(GstBusSyncHandler) (gst_worker_message_sync), worker, NULL);

if (workerclass->prepare && !workerclass->prepare (worker))
goto error_prepare;
Expand Down Expand Up @@ -889,6 +896,12 @@ gst_worker_reset (GstWorker * worker)
return ok;
}

static void
gst_worker_close (GstWorker * worker)
{

}

/**
* @brief Initialize GstWorkerClass.
* @param klass The instance of GstWorkerClass.
Expand Down Expand Up @@ -934,4 +947,5 @@ gst_worker_class_init (GstWorkerClass * klass)
klass->create_pipeline = gst_worker_create_pipeline;
klass->null = gst_worker_null;
klass->reset = gst_worker_reset;
klass->close = gst_worker_close;
}
14 changes: 14 additions & 0 deletions tools/gstworker.h
Expand Up @@ -91,6 +91,13 @@ typedef GstWorkerNullReturn (*GstWorkerNullFunc) (GstWorker * worker);
*/
typedef void (*GstWorkerAliveFunc) (GstWorker * worker);

/**
* @brief worker virtual close function
* @param worker The GstWorker instance.
*/

typedef void (*GstWorkerCloseFunc) (GstWorker * worker);

/**
* @class GstWorker
* @struct _GstWorker
Expand Down Expand Up @@ -207,6 +214,13 @@ struct _GstWorkerClass
* @param worker The GstWorker instance.
*/
gboolean (*reset) (GstWorker * worker);

/*
* @brief Close the worker, deallocating and closing resources
* @param worker The GstWorker instance.
*/

void (*close) (GstWorker * worker);
};

/**
Expand Down

0 comments on commit f3eb3eb

Please sign in to comment.