staging: vchiq_core: introduce parse_message

The function parse_rx_slots is very longer. So move at least the message
parsing into a separate function to improve readability. In good case
the function returns the message payload length which is necessary to
move to the next message.

Signed-off-by: Stefan Wahren <stefan.wahren@i2se.com>
Link: https://lore.kernel.org/r/1621105859-30215-19-git-send-email-stefan.wahren@i2se.com
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
This commit is contained in:
Stefan Wahren
2021-05-15 21:10:57 +02:00
committed by Greg Kroah-Hartman
parent 1a64ab341d
commit 4c51210461

View File

@@ -1545,12 +1545,324 @@ bail_not_ready:
return 0;
}
/**
* parse_message() - parses a single message from the rx slot
* @state: vchiq state struct
* @header: message header
*
* Context: Process context
*
* Return:
* * >= 0 - size of the parsed message payload (without header)
* * -EINVAL - fatal error occurred, bail out is required
*/
static int
parse_message(struct vchiq_state *state, struct vchiq_header *header)
{
struct vchiq_service *service = NULL;
unsigned int localport, remoteport;
int msgid, size, type, ret = -EINVAL;
DEBUG_INITIALISE(state->local)
DEBUG_VALUE(PARSE_HEADER, (int)(long)header);
msgid = header->msgid;
DEBUG_VALUE(PARSE_MSGID, msgid);
size = header->size;
type = VCHIQ_MSG_TYPE(msgid);
localport = VCHIQ_MSG_DSTPORT(msgid);
remoteport = VCHIQ_MSG_SRCPORT(msgid);
if (type != VCHIQ_MSG_DATA)
VCHIQ_STATS_INC(state, ctrl_rx_count);
switch (type) {
case VCHIQ_MSG_OPENACK:
case VCHIQ_MSG_CLOSE:
case VCHIQ_MSG_DATA:
case VCHIQ_MSG_BULK_RX:
case VCHIQ_MSG_BULK_TX:
case VCHIQ_MSG_BULK_RX_DONE:
case VCHIQ_MSG_BULK_TX_DONE:
service = find_service_by_port(state, localport);
if ((!service ||
((service->remoteport != remoteport) &&
(service->remoteport != VCHIQ_PORT_FREE))) &&
(localport == 0) &&
(type == VCHIQ_MSG_CLOSE)) {
/*
* This could be a CLOSE from a client which
* hadn't yet received the OPENACK - look for
* the connected service
*/
if (service)
unlock_service(service);
service = get_connected_service(state,
remoteport);
if (service)
vchiq_log_warning(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) - found connected service %d",
state->id, msg_type_str(type),
header, remoteport, localport,
service->localport);
}
if (!service) {
vchiq_log_error(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) - invalid/closed service %d",
state->id, msg_type_str(type),
header, remoteport, localport,
localport);
goto skip_message;
}
break;
default:
break;
}
if (SRVTRACE_ENABLED(service, VCHIQ_LOG_INFO)) {
int svc_fourcc;
svc_fourcc = service
? service->base.fourcc
: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
vchiq_log_info(SRVTRACE_LEVEL(service),
"Rcvd Msg %s(%u) from %c%c%c%c s:%d d:%d len:%d",
msg_type_str(type), type,
VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
remoteport, localport, size);
if (size > 0)
vchiq_log_dump_mem("Rcvd", 0, header->data,
min(16, size));
}
if (((unsigned long)header & VCHIQ_SLOT_MASK) +
calc_stride(size) > VCHIQ_SLOT_SIZE) {
vchiq_log_error(vchiq_core_log_level,
"header %pK (msgid %x) - size %x too big for slot",
header, (unsigned int)msgid,
(unsigned int)size);
WARN(1, "oversized for slot\n");
}
switch (type) {
case VCHIQ_MSG_OPEN:
WARN_ON(!(VCHIQ_MSG_DSTPORT(msgid) == 0));
if (!parse_open(state, header))
goto bail_not_ready;
break;
case VCHIQ_MSG_OPENACK:
if (size >= sizeof(struct vchiq_openack_payload)) {
const struct vchiq_openack_payload *payload =
(struct vchiq_openack_payload *)
header->data;
service->peer_version = payload->version;
}
vchiq_log_info(vchiq_core_log_level,
"%d: prs OPENACK@%pK,%x (%d->%d) v:%d",
state->id, header, size, remoteport, localport,
service->peer_version);
if (service->srvstate == VCHIQ_SRVSTATE_OPENING) {
service->remoteport = remoteport;
vchiq_set_service_state(service,
VCHIQ_SRVSTATE_OPEN);
complete(&service->remove_event);
} else {
vchiq_log_error(vchiq_core_log_level,
"OPENACK received in state %s",
srvstate_names[service->srvstate]);
}
break;
case VCHIQ_MSG_CLOSE:
WARN_ON(size != 0); /* There should be no data */
vchiq_log_info(vchiq_core_log_level,
"%d: prs CLOSE@%pK (%d->%d)",
state->id, header, remoteport, localport);
mark_service_closing_internal(service, 1);
if (vchiq_close_service_internal(service,
1/*close_recvd*/) == VCHIQ_RETRY)
goto bail_not_ready;
vchiq_log_info(vchiq_core_log_level,
"Close Service %c%c%c%c s:%u d:%d",
VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
service->localport,
service->remoteport);
break;
case VCHIQ_MSG_DATA:
vchiq_log_info(vchiq_core_log_level,
"%d: prs DATA@%pK,%x (%d->%d)",
state->id, header, size, remoteport, localport);
if ((service->remoteport == remoteport) &&
(service->srvstate == VCHIQ_SRVSTATE_OPEN)) {
header->msgid = msgid | VCHIQ_MSGID_CLAIMED;
claim_slot(state->rx_info);
DEBUG_TRACE(PARSE_LINE);
if (make_service_callback(service,
VCHIQ_MESSAGE_AVAILABLE, header,
NULL) == VCHIQ_RETRY) {
DEBUG_TRACE(PARSE_LINE);
goto bail_not_ready;
}
VCHIQ_SERVICE_STATS_INC(service, ctrl_rx_count);
VCHIQ_SERVICE_STATS_ADD(service, ctrl_rx_bytes,
size);
} else {
VCHIQ_STATS_INC(state, error_count);
}
break;
case VCHIQ_MSG_CONNECT:
vchiq_log_info(vchiq_core_log_level,
"%d: prs CONNECT@%pK", state->id, header);
state->version_common = ((struct vchiq_slot_zero *)
state->slot_data)->version;
complete(&state->connect);
break;
case VCHIQ_MSG_BULK_RX:
case VCHIQ_MSG_BULK_TX:
/*
* We should never receive a bulk request from the
* other side since we're not setup to perform as the
* master.
*/
WARN_ON(1);
break;
case VCHIQ_MSG_BULK_RX_DONE:
case VCHIQ_MSG_BULK_TX_DONE:
if ((service->remoteport == remoteport) &&
(service->srvstate != VCHIQ_SRVSTATE_FREE)) {
struct vchiq_bulk_queue *queue;
struct vchiq_bulk *bulk;
queue = (type == VCHIQ_MSG_BULK_RX_DONE) ?
&service->bulk_rx : &service->bulk_tx;
DEBUG_TRACE(PARSE_LINE);
if (mutex_lock_killable(&service->bulk_mutex)) {
DEBUG_TRACE(PARSE_LINE);
goto bail_not_ready;
}
if ((int)(queue->remote_insert -
queue->local_insert) >= 0) {
vchiq_log_error(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) unexpected (ri=%d,li=%d)",
state->id, msg_type_str(type),
header, remoteport, localport,
queue->remote_insert,
queue->local_insert);
mutex_unlock(&service->bulk_mutex);
break;
}
if (queue->process != queue->remote_insert) {
pr_err("%s: p %x != ri %x\n",
__func__,
queue->process,
queue->remote_insert);
mutex_unlock(&service->bulk_mutex);
goto bail_not_ready;
}
bulk = &queue->bulks[
BULK_INDEX(queue->remote_insert)];
bulk->actual = *(int *)header->data;
queue->remote_insert++;
vchiq_log_info(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) %x@%pad",
state->id, msg_type_str(type),
header, remoteport, localport,
bulk->actual, &bulk->data);
vchiq_log_trace(vchiq_core_log_level,
"%d: prs:%d %cx li=%x ri=%x p=%x",
state->id, localport,
(type == VCHIQ_MSG_BULK_RX_DONE) ?
'r' : 't',
queue->local_insert,
queue->remote_insert, queue->process);
DEBUG_TRACE(PARSE_LINE);
WARN_ON(queue->process == queue->local_insert);
vchiq_complete_bulk(bulk);
queue->process++;
mutex_unlock(&service->bulk_mutex);
DEBUG_TRACE(PARSE_LINE);
notify_bulks(service, queue, 1/*retry_poll*/);
DEBUG_TRACE(PARSE_LINE);
}
break;
case VCHIQ_MSG_PADDING:
vchiq_log_trace(vchiq_core_log_level,
"%d: prs PADDING@%pK,%x",
state->id, header, size);
break;
case VCHIQ_MSG_PAUSE:
/* If initiated, signal the application thread */
vchiq_log_trace(vchiq_core_log_level,
"%d: prs PAUSE@%pK,%x",
state->id, header, size);
if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
vchiq_log_error(vchiq_core_log_level,
"%d: PAUSE received in state PAUSED",
state->id);
break;
}
if (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT) {
/* Send a PAUSE in response */
if (queue_message(state, NULL,
VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
NULL, NULL, 0, QMFLAGS_NO_MUTEX_UNLOCK)
== VCHIQ_RETRY)
goto bail_not_ready;
}
/* At this point slot_mutex is held */
vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSED);
break;
case VCHIQ_MSG_RESUME:
vchiq_log_trace(vchiq_core_log_level,
"%d: prs RESUME@%pK,%x",
state->id, header, size);
/* Release the slot mutex */
mutex_unlock(&state->slot_mutex);
vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
break;
case VCHIQ_MSG_REMOTE_USE:
vchiq_on_remote_use(state);
break;
case VCHIQ_MSG_REMOTE_RELEASE:
vchiq_on_remote_release(state);
break;
case VCHIQ_MSG_REMOTE_USE_ACTIVE:
break;
default:
vchiq_log_error(vchiq_core_log_level,
"%d: prs invalid msgid %x@%pK,%x",
state->id, msgid, header, size);
WARN(1, "invalid message\n");
break;
}
skip_message:
ret = size;
bail_not_ready:
if (service)
unlock_service(service);
return ret;
}
/* Called by the slot handler thread */
static void
parse_rx_slots(struct vchiq_state *state)
{
struct vchiq_shared_state *remote = state->remote;
struct vchiq_service *service = NULL;
int tx_pos;
DEBUG_INITIALISE(state->local)
@@ -1559,9 +1871,7 @@ parse_rx_slots(struct vchiq_state *state)
while (state->rx_pos != tx_pos) {
struct vchiq_header *header;
int msgid, size;
int type;
unsigned int localport, remoteport;
int size;
DEBUG_TRACE(PARSE_LINE);
if (!state->rx_data) {
@@ -1585,294 +1895,9 @@ parse_rx_slots(struct vchiq_state *state)
header = (struct vchiq_header *)(state->rx_data +
(state->rx_pos & VCHIQ_SLOT_MASK));
DEBUG_VALUE(PARSE_HEADER, (int)(long)header);
msgid = header->msgid;
DEBUG_VALUE(PARSE_MSGID, msgid);
size = header->size;
type = VCHIQ_MSG_TYPE(msgid);
localport = VCHIQ_MSG_DSTPORT(msgid);
remoteport = VCHIQ_MSG_SRCPORT(msgid);
if (type != VCHIQ_MSG_DATA)
VCHIQ_STATS_INC(state, ctrl_rx_count);
switch (type) {
case VCHIQ_MSG_OPENACK:
case VCHIQ_MSG_CLOSE:
case VCHIQ_MSG_DATA:
case VCHIQ_MSG_BULK_RX:
case VCHIQ_MSG_BULK_TX:
case VCHIQ_MSG_BULK_RX_DONE:
case VCHIQ_MSG_BULK_TX_DONE:
service = find_service_by_port(state, localport);
if ((!service ||
((service->remoteport != remoteport) &&
(service->remoteport != VCHIQ_PORT_FREE))) &&
(localport == 0) &&
(type == VCHIQ_MSG_CLOSE)) {
/*
* This could be a CLOSE from a client which
* hadn't yet received the OPENACK - look for
* the connected service
*/
if (service)
unlock_service(service);
service = get_connected_service(state,
remoteport);
if (service)
vchiq_log_warning(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) - found connected service %d",
state->id, msg_type_str(type),
header, remoteport, localport,
service->localport);
}
if (!service) {
vchiq_log_error(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) - invalid/closed service %d",
state->id, msg_type_str(type),
header, remoteport, localport,
localport);
goto skip_message;
}
break;
default:
break;
}
if (SRVTRACE_ENABLED(service, VCHIQ_LOG_INFO)) {
int svc_fourcc;
svc_fourcc = service
? service->base.fourcc
: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
vchiq_log_info(SRVTRACE_LEVEL(service),
"Rcvd Msg %s(%u) from %c%c%c%c s:%d d:%d len:%d",
msg_type_str(type), type,
VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
remoteport, localport, size);
if (size > 0)
vchiq_log_dump_mem("Rcvd", 0, header->data,
min(16, size));
}
if (((unsigned long)header & VCHIQ_SLOT_MASK) +
calc_stride(size) > VCHIQ_SLOT_SIZE) {
vchiq_log_error(vchiq_core_log_level,
"header %pK (msgid %x) - size %x too big for slot",
header, (unsigned int)msgid,
(unsigned int)size);
WARN(1, "oversized for slot\n");
}
switch (type) {
case VCHIQ_MSG_OPEN:
WARN_ON(!(VCHIQ_MSG_DSTPORT(msgid) == 0));
if (!parse_open(state, header))
goto bail_not_ready;
break;
case VCHIQ_MSG_OPENACK:
if (size >= sizeof(struct vchiq_openack_payload)) {
const struct vchiq_openack_payload *payload =
(struct vchiq_openack_payload *)
header->data;
service->peer_version = payload->version;
}
vchiq_log_info(vchiq_core_log_level,
"%d: prs OPENACK@%pK,%x (%d->%d) v:%d",
state->id, header, size, remoteport, localport,
service->peer_version);
if (service->srvstate == VCHIQ_SRVSTATE_OPENING) {
service->remoteport = remoteport;
vchiq_set_service_state(service,
VCHIQ_SRVSTATE_OPEN);
complete(&service->remove_event);
} else {
vchiq_log_error(vchiq_core_log_level,
"OPENACK received in state %s",
srvstate_names[service->srvstate]);
}
break;
case VCHIQ_MSG_CLOSE:
WARN_ON(size != 0); /* There should be no data */
vchiq_log_info(vchiq_core_log_level,
"%d: prs CLOSE@%pK (%d->%d)",
state->id, header, remoteport, localport);
mark_service_closing_internal(service, 1);
if (vchiq_close_service_internal(service,
1/*close_recvd*/) == VCHIQ_RETRY)
goto bail_not_ready;
vchiq_log_info(vchiq_core_log_level,
"Close Service %c%c%c%c s:%u d:%d",
VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
service->localport,
service->remoteport);
break;
case VCHIQ_MSG_DATA:
vchiq_log_info(vchiq_core_log_level,
"%d: prs DATA@%pK,%x (%d->%d)",
state->id, header, size, remoteport, localport);
if ((service->remoteport == remoteport) &&
(service->srvstate == VCHIQ_SRVSTATE_OPEN)) {
header->msgid = msgid | VCHIQ_MSGID_CLAIMED;
claim_slot(state->rx_info);
DEBUG_TRACE(PARSE_LINE);
if (make_service_callback(service,
VCHIQ_MESSAGE_AVAILABLE, header,
NULL) == VCHIQ_RETRY) {
DEBUG_TRACE(PARSE_LINE);
goto bail_not_ready;
}
VCHIQ_SERVICE_STATS_INC(service, ctrl_rx_count);
VCHIQ_SERVICE_STATS_ADD(service, ctrl_rx_bytes,
size);
} else {
VCHIQ_STATS_INC(state, error_count);
}
break;
case VCHIQ_MSG_CONNECT:
vchiq_log_info(vchiq_core_log_level,
"%d: prs CONNECT@%pK", state->id, header);
state->version_common = ((struct vchiq_slot_zero *)
state->slot_data)->version;
complete(&state->connect);
break;
case VCHIQ_MSG_BULK_RX:
case VCHIQ_MSG_BULK_TX:
/*
* We should never receive a bulk request from the
* other side since we're not setup to perform as the
* master.
*/
WARN_ON(1);
break;
case VCHIQ_MSG_BULK_RX_DONE:
case VCHIQ_MSG_BULK_TX_DONE:
if ((service->remoteport == remoteport) &&
(service->srvstate != VCHIQ_SRVSTATE_FREE)) {
struct vchiq_bulk_queue *queue;
struct vchiq_bulk *bulk;
queue = (type == VCHIQ_MSG_BULK_RX_DONE) ?
&service->bulk_rx : &service->bulk_tx;
DEBUG_TRACE(PARSE_LINE);
if (mutex_lock_killable(&service->bulk_mutex)) {
DEBUG_TRACE(PARSE_LINE);
goto bail_not_ready;
}
if ((int)(queue->remote_insert -
queue->local_insert) >= 0) {
vchiq_log_error(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) unexpected (ri=%d,li=%d)",
state->id, msg_type_str(type),
header, remoteport, localport,
queue->remote_insert,
queue->local_insert);
mutex_unlock(&service->bulk_mutex);
break;
}
if (queue->process != queue->remote_insert) {
pr_err("%s: p %x != ri %x\n",
__func__,
queue->process,
queue->remote_insert);
mutex_unlock(&service->bulk_mutex);
goto bail_not_ready;
}
bulk = &queue->bulks[
BULK_INDEX(queue->remote_insert)];
bulk->actual = *(int *)header->data;
queue->remote_insert++;
vchiq_log_info(vchiq_core_log_level,
"%d: prs %s@%pK (%d->%d) %x@%pad",
state->id, msg_type_str(type),
header, remoteport, localport,
bulk->actual, &bulk->data);
vchiq_log_trace(vchiq_core_log_level,
"%d: prs:%d %cx li=%x ri=%x p=%x",
state->id, localport,
(type == VCHIQ_MSG_BULK_RX_DONE) ?
'r' : 't',
queue->local_insert,
queue->remote_insert, queue->process);
DEBUG_TRACE(PARSE_LINE);
WARN_ON(queue->process == queue->local_insert);
vchiq_complete_bulk(bulk);
queue->process++;
mutex_unlock(&service->bulk_mutex);
DEBUG_TRACE(PARSE_LINE);
notify_bulks(service, queue, 1/*retry_poll*/);
DEBUG_TRACE(PARSE_LINE);
}
break;
case VCHIQ_MSG_PADDING:
vchiq_log_trace(vchiq_core_log_level,
"%d: prs PADDING@%pK,%x",
state->id, header, size);
break;
case VCHIQ_MSG_PAUSE:
/* If initiated, signal the application thread */
vchiq_log_trace(vchiq_core_log_level,
"%d: prs PAUSE@%pK,%x",
state->id, header, size);
if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
vchiq_log_error(vchiq_core_log_level,
"%d: PAUSE received in state PAUSED",
state->id);
break;
}
if (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT) {
/* Send a PAUSE in response */
if (queue_message(state, NULL,
VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
NULL, NULL, 0, QMFLAGS_NO_MUTEX_UNLOCK)
== VCHIQ_RETRY)
goto bail_not_ready;
}
/* At this point slot_mutex is held */
vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSED);
break;
case VCHIQ_MSG_RESUME:
vchiq_log_trace(vchiq_core_log_level,
"%d: prs RESUME@%pK,%x",
state->id, header, size);
/* Release the slot mutex */
mutex_unlock(&state->slot_mutex);
vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
break;
case VCHIQ_MSG_REMOTE_USE:
vchiq_on_remote_use(state);
break;
case VCHIQ_MSG_REMOTE_RELEASE:
vchiq_on_remote_release(state);
break;
case VCHIQ_MSG_REMOTE_USE_ACTIVE:
break;
default:
vchiq_log_error(vchiq_core_log_level,
"%d: prs invalid msgid %x@%pK,%x",
state->id, msgid, header, size);
WARN(1, "invalid message\n");
break;
}
skip_message:
if (service) {
unlock_service(service);
service = NULL;
}
size = parse_message(state, header);
if (size < 0)
return;
state->rx_pos += calc_stride(size);
@@ -1887,10 +1912,6 @@ skip_message:
state->rx_data = NULL;
}
}
bail_not_ready:
if (service)
unlock_service(service);
}
/* Called by the slot handler thread */