Manpages - zmq_socket_monitor_versioned.3

Table of Contents

NAME

zmq_socket_monitor_versioned - monitor socket events

SYNOPSIS

int zmq_socket_monitor_versioned (void */*socket/, char *endpoint, uint64_t events, int event_version, int type);*

int zmq_socket_monitor_pipes_stats (void */*socket/);*

DESCRIPTION

The zmq_socket_monitor_versioned() method lets an application thread track socket events (like connects) on a ZeroMQ socket. Each call to this method creates a ZMQ_PAIR socket and binds that to the specified inproc:// endpoint. To collect the socket events, you must create your own ZMQ_PAIR socket, and connect that to the endpoint.

The events argument is a bitmask of the socket events you wish to monitor, see Supported events below. To monitor all events for a version, use the event value ZMQ_EVENT_ALL_V<version>, e.g. ZMQ_EVENT_ALL_V1. For non-DRAFT event versions, this value will not change in the future, so new event types will only be added in new versions (note that this is a change over zmq_socket_monitor and the unversioned ZMQ_EVENT_ALL).

Note that event_version 2 is currently in DRAFT mode. The protocol may be changed at any time for event_versions in DRAFT.

ZMQ_CURRENT_EVENT_VERSION and ZMQ_CURRENT_EVENT_VERSION_DRAFT are always defined to the most recent stable or DRAFT event version, which are currently 1 resp. 2

This page describes the protocol for event_version 2 only. For the protocol used with event_version 1, please refer to *zmq_socket_monitor*(3).

Each event is sent in multiple frames. The first frame contains an event number (64 bits). The number and content of further frames depend on this event number.

Unless it is specified differently, the second frame contains the number of value frames that will follow it as a 64 bits integer. The third frame to N-th frames contain an event value (64 bits) that provides additional data according to the event number. Each event type might have a different number of values. The second-to-last and last frames contain strings that specifies the affected connection or endpoint. The former frame contains a string denoting the local endpoint, while the latter frame contains a string denoting the remote endpoint. Either of these may be empty, depending on the event type and whether the connection uses a bound or connected local endpoint.

Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future.

The type argument is used to specify the type of the monitoring socket. Supported types are ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH. Note that consumers of the events will have to be compatible with the socket type, for instance a monitoring socket of type ZMQ_PUB will require consumers of type ZMQ_SUB. In the case that the monitoring socket type is of ZMQ_PUB, the multipart message topic is the event number, thus consumers should subscribe to the events they want to receive.

The zmq_socket_monitor_pipes_stats() method triggers an event of type ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. NOTE: zmq_socket_monitor_pipes_stats() is in DRAFT state.

    Monitoring events are only generated by some transports: At the moment these
    are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
    zmq_socket_monitor_versioned on a socket that is connected or bound to some
    other transport, as this may not be known at the time
    zmq_socket_monitor_versioned is called. Also, a socket can be connected/bound
    to multiple endpoints using different transports.

Supported events (v1)

    ZMQ_EVENT_CONNECTED
    ~~~~~~~~~~~~~~~~~~~
    The socket has successfully connected to a remote peer. The event value
    is the file descriptor (FD) of the underlying network socket. Warning:
    there is no guarantee that the FD is still valid by the time your code
    receives this event.

    ZMQ_EVENT_CONNECT_DELAYED
    ~~~~~~~~~~~~~~~~~~~~~~~~~
    A connect request on the socket is pending. The event value is unspecified.

    ZMQ_EVENT_CONNECT_RETRIED
    ~~~~~~~~~~~~~~~~~~~~~~~~~
    A connect request failed, and is now being retried. The event value is the
    reconnect interval in milliseconds. Note that the reconnect interval is
    recalculated for each retry.

    ZMQ_EVENT_LISTENING
    ~~~~~~~~~~~~~~~~~~~
    The socket was successfully bound to a network interface. The event value
    is the FD of the underlying network socket. Warning: there is no guarantee
    that the FD is still valid by the time your code receives this event.

    ZMQ_EVENT_BIND_FAILED
    ~~~~~~~~~~~~~~~~~~~~~
    The socket could not bind to a given interface. The event value is the
    errno generated by the system bind call.

    ZMQ_EVENT_ACCEPTED
    ~~~~~~~~~~~~~~~~~~
    The socket has accepted a connection from a remote peer. The event value is
    the FD of the underlying network socket. Warning: there is no guarantee that
    the FD is still valid by the time your code receives this event.

    ZMQ_EVENT_ACCEPT_FAILED
    ~~~~~~~~~~~~~~~~~~~~~~~
    The socket has rejected a connection from a remote peer. The event value is
    the errno generated by the accept call.

    ZMQ_EVENT_CLOSED
    ~~~~~~~~~~~~~~~~
    The socket was closed. The event value is the FD of the (now closed) network
    socket.

    ZMQ_EVENT_CLOSE_FAILED
    ~~~~~~~~~~~~~~~~~~~~~~
    The socket close failed. The event value is the errno returned by the system
    call. Note that this event occurs only on IPC transports.

    ZMQ_EVENT_DISCONNECTED
    ~~~~~~~~~~~~~~~~~~~~~~
    The socket was disconnected unexpectedly. The event value is the FD of the
    underlying network socket. Warning: this socket will be closed.

    ZMQ_EVENT_MONITOR_STOPPED
    ~~~~~~~~~~~~~~~~~~~~~~~~~
    Monitoring on this socket ended.

    ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    Unspecified error during handshake.
    The event value is an errno.

    ZMQ_EVENT_HANDSHAKE_SUCCEEDED
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    The ZMTP security mechanism handshake succeeded.
    The event value is unspecified.

    ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    The ZMTP security mechanism handshake failed due to some mechanism protocol
    error, either between the ZMTP mechanism peers, or between the mechanism
    server and the ZAP handler. This indicates a configuration or implementation
    error in either peer resp. the ZAP handler.
    The event value is one of the ZMQ_PROTOCOL_ERROR_* values:
    ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED
    ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND
    ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE
    ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY
    ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME
    ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA
    ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC
    ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH
    ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED
    ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY
    ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID
    ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION
    ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE
    ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA

    ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    The ZMTP security mechanism handshake failed due to an authentication failure.
    The event value is the status code returned by the ZAP handler (i.e. 300,
    400 or 500).

Supported events (v2)

    ZMQ_EVENT_PIPE_STATS
    ~~~~~~~~~~~~~~~~~~~~
    This event provides two values, the number of messages in each of the two
    queues associated with the returned endpoint (respectively egress and ingress).
    This event only triggers after calling the function
    _zmq_socket_monitor_pipes_stats()_.
    NOTE: this measurement is asynchronous, so by the time the message is received
    the internal state might have already changed.
    NOTE: when the monitored socket and the monitor are not used in a poll, the
    event might not be delivered until an API has been called on the monitored
    socket, like zmq_getsockopt for example (the option is irrelevant).
    NOTE: in DRAFT state, not yet available in stable releases.



    RETURN VALUE

The zmq_socket_monitor() and zmq_socket_monitor_pipes_stats() functions return a value of 0 or greater if successful. Otherwise they return -1 and set errno to one of the values defined below.

ERRORS - FIZMQ_SOCKET_MONITOR()FR

ETERM

The 0MQ context associated with the specified socket was terminated.

EPROTONOSUPPORT

The transport protocol of the monitor endpoint is not supported. Monitor sockets are required to use the inproc:// transport.

EINVAL

The monitor endpoint supplied does not exist or the specified socket type is not supported.

ERRORS - FIZMQ_SOCKET_MONITOR_PIPES_STATS()FR

ENOTSOCK

The socket parameter was not a valid 0MQ socket.

EINVAL

The socket did not have monitoring enabled.

EAGAIN

The monitored socket did not have any connections to monitor yet.

EXAMPLE

Monitoring client and server sockets.

    //  Read one event off the monitor socket; return values and addresses
    //  by reference, if not null, and event number by value. Returns -1
    //  in case of error.

    static uint64_t
    get_monitor_event (void *monitor, uint64_t *value, char **local_address, char **remote_address)
    {
        //  First frame in message contains event number
        zmq_msg_t msg;
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (zmq_msg_more (&msg));

        uint64_t event;
        memcpy (&event, zmq_msg_data (&msg), sizeof (event));
        zmq_msg_close (&msg);

        //  Second frame in message contains the number of values
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (zmq_msg_more (&msg));

        uint64_t value_count;
        memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
        zmq_msg_close (&msg);

        for (uint64_t i = 0; i < value_count; ++i) {
            //  Subsequent frames in message contain event values
            zmq_msg_init (&msg);
            if (zmq_msg_recv (&msg, monitor, 0) == -1)
                return -1;              //  Interrupted, presumably
            assert (zmq_msg_more (&msg));

            if (value_ && value_ + i)
                memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
            zmq_msg_close (&msg);
        }

        //  Second-to-last frame in message contains local address
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (zmq_msg_more (&msg));

        if (local_address_) {
            uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
            size_t size = zmq_msg_size (&msg);
            *local_address_ = (char *) malloc (size + 1);
            memcpy (*local_address_, data, size);
            (*local_address_)[size] = 0;
        }
        zmq_msg_close (&msg);

        //  Last frame in message contains remote address
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (!zmq_msg_more (&msg));

        if (remote_address_) {
            uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
            size_t size = zmq_msg_size (&msg);
            *remote_address_ = (char *) malloc (size + 1);
            memcpy (*remote_address_, data, size);
            (*remote_address_)[size] = 0;
        }
        zmq_msg_close (&msg);

        return event;
    }

    int main (void)
    {
        void *ctx = zmq_ctx_new ();
        assert (ctx);

        //  Well monitor these two sockets
        void *client = zmq_socket (ctx, ZMQ_DEALER);
        assert (client);
        void *server = zmq_socket (ctx, ZMQ_DEALER);
        assert (server);

        //  Socket monitoring only works over inproc://
        int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2);
        assert (rc == -1);
        assert (zmq_errno () == EPROTONOSUPPORT);

        //  Monitor all events on client and server sockets
        rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2);
        assert (rc == 0);
        rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2);
        assert (rc == 0);

        //  Create two sockets for collecting monitor events
        void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
        assert (client_mon);
        void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
        assert (server_mon);

        //  Connect these to the inproc endpoints so theyll get events
        rc = zmq_connect (client_mon, "inproc://monitor-client");
        assert (rc == 0);
        rc = zmq_connect (server_mon, "inproc://monitor-server");
        assert (rc == 0);

        //  Now do a basic ping test
        rc = zmq_bind (server, "tcp://127.0.0.1:9998");
        assert (rc == 0);
        rc = zmq_connect (client, "tcp://127.0.0.1:9998");
        assert (rc == 0);
        bounce (client, server);

        //  Close client and server
        close_zero_linger (client);
        close_zero_linger (server);

        //  Now collect and check events from both sockets
        int event = get_monitor_event (client_mon, NULL, NULL);
        if (event == ZMQ_EVENT_CONNECT_DELAYED)
            event = get_monitor_event (client_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_CONNECTED);
        event = get_monitor_event (client_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_MONITOR_STOPPED);

        //  This is the flow of server events
        event = get_monitor_event (server_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_LISTENING);
        event = get_monitor_event (server_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_ACCEPTED);
        event = get_monitor_event (server_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_CLOSED);
        event = get_monitor_event (server_mon, NULL, NULL);
        assert (event == ZMQ_EVENT_MONITOR_STOPPED);

        //  Close down the sockets
        close_zero_linger (client_mon);
        close_zero_linger (server_mon);
        zmq_ctx_term (ctx);

        return 0 ;
    }

SEE ALSO

*zmq*(7)

AUTHORS

This page was written by the 0MQ community. To make a change please read the 0MQ Contribution Policy at http://www.zeromq.org/docs:contributing.

Author: dt

Created: 2022-02-21 Mon 12:30