kernel version은 v6.10.12 버전으로 분석을 진행합니다.
msq_obtain_object_check
아래의 코드를 이해하기 위해서 먼저 선언되어 있는 구조체들 먼저 분석하겠습니다.
static inline struct msg_queue *msq_obtain_object_check(struct ipc_namespace *ns,
int id)
{
struct kern_ipc_perm *ipcp = ipc_obtain_object_check(&msg_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
return container_of(ipcp, struct msg_queue, q_perm);
}
C
복사
1.
struct ipc_namespace *ns
현재 실행중인 process의 IPC 리소스를 namespace 단위로 분리하기 위한 구조체
code
2.
int id
Message Queue의 id
3.
return static inline struct *msg_queue
struct msg_queue {
struct kern_ipc_perm q_perm;
time64_t q_stime; /* last msgsnd time */
time64_t q_rtime; /* last msgrcv time */
time64_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
struct pid *q_lspid; /* pid of last msgsnd */
struct pid *q_lrpid; /* last receive pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
} __randomize_layout;
C
복사
msg_ids
지금 실행되는 프로세스의 ids[1]을 return합니다.
#define IPC_MSG_IDS 1
#define msg_ids(ns) ((ns)->ids[IPC_MSG_IDS])
C
복사
ids의 구조체를 보면 아래와 같습니다.
struct ipc_namespace {
struct ipc_ids ids[3];
...
}
struct ipc_ids {
int in_use;
unsigned short seq;
struct rw_semaphore rwsem;
struct idr ipcs_idr;
int max_idx;
int last_idx; /* For wrap around detection */
#ifdef CONFIG_CHECKPOINT_RESTORE
int next_id;
#endif
struct rhashtable key_ht;
};
C
복사
지금까지의 분석한 내용을 바탕으로 ipc_obtain_object_check 함수의 인자를 보면 Message Queue의 ipc_ids의 주소와 Message Queue의 id를 전달합니다.
struct kern_ipc_perm *ipcp = ipc_obtain_object_check(&msg_ids(ns), id);
C
복사
ipc_obtain_object_check
ipc_ids의 구조체는 위에서 봤으니 ipc_obtain_object_idr 함수로 들어가보겠습니다.
/**
* ipc_obtain_object_check
* @ids: ipc identifier set
* @id: ipc id to look for
*
* Similar to ipc_obtain_object_idr() but also checks the ipc object
* sequence number.
*
* Call inside the RCU critical section.
* The ipc object is *not* locked on exit.
*/
struct kern_ipc_perm *ipc_obtain_object_check(struct ipc_ids *ids, int id)
{
struct kern_ipc_perm *out = ipc_obtain_object_idr(ids, id);
if (IS_ERR(out))
goto out;
if (ipc_checkid(out, id))
return ERR_PTR(-EINVAL);
out:
return out;
}
C
복사
ipc_obtain_object_idr
/**
* ipc_obtain_object_idr
* @ids: ipc identifier set
* @id: ipc id to look for
*
* Look for an id in the ipc ids idr and return associated ipc object.
*
* Call inside the RCU critical section.
* The ipc object is *not* locked on exit.
*/
struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids, int id)
{
struct kern_ipc_perm *out;
int idx = ipcid_to_idx(id);
out = idr_find(&ids->ipcs_idr, idx);
if (!out)
return ERR_PTR(-EINVAL);
return out;
}
C
복사
ipcid_to_idx
Message Queue의 id에서 index부분을 추출하기 위한 매크로입니다.
#define ipcid_to_idx(id) ((id) & IPCMNI_IDX_MASK)
C
복사
idr_find
해당 코드를 들어가기전 전달하는 구조체 먼저 보겠습니다.
out = idr_find(&ids->ipcs_idr, idx);
C
복사
ipcs_idr은 idr이라는 구조체입니다.
struct idr {
struct radix_tree_root idr_rt;
unsigned int idr_base;
unsigned int idr_next;
};
C
복사
idr_rt는 밑에서 보도록하고
•
idr_base
시작 offset(default: 0)
•
idr_next
다음 할당될 ID 후보
idr_find 함수를보면 idr→idr_rt의 주소와 Message Queue의 id를 내부 index로 변환한 id - idr→idr_base를 진행합니다.
/**
* idr_find() - Return pointer for given ID.
* @idr: IDR handle.
* @id: Pointer ID.
*
* Looks up the pointer associated with this ID. A %NULL pointer may
* indicate that @id is not allocated or that the %NULL pointer was
* associated with this ID.
*
* This function can be called under rcu_read_lock(), given that the leaf
* pointers lifetimes are correctly managed.
*
* Return: The pointer associated with this ID.
*/
void *idr_find(const struct idr *idr, unsigned long id)
{
return radix_tree_lookup(&idr->idr_rt, id - idr->idr_base);
}
EXPORT_SYMBOL_GPL(idr_find);
C
복사
radix_tree_lookup
__radix_tree_lookup 코드를 보기 전 root로 전달되는 radix_tree_root 먼저 보겠습니다.
/**
* radix_tree_lookup - perform lookup operation on a radix tree
* @root: radix tree root
* @index: index key
*
* Lookup the item at the position @index in the radix tree @root.
*
* This function can be called under rcu_read_lock, however the caller
* must manage lifetimes of leaf nodes (eg. RCU may also be used to free
* them safely). No RCU barriers are required to access or modify the
* returned item, however.
*/
void *radix_tree_lookup(const struct radix_tree_root *root, unsigned long index)
{
return __radix_tree_lookup(root, index, NULL, NULL);
}
EXPORT_SYMBOL(radix_tree_lookup);
C
복사
사실상 radix_tree_root는 void 타입으로 저장되어 있어 분석할 내용이 존재하지 않습니다.
#define radix_tree_root xarray
struct xarray {
spinlock_t xa_lock;
/* private: The rest of the data structure is not to be used directly. */
gfp_t xa_flags;
void __rcu * xa_head;
};
C
복사
해당 코드, 함수, 개념을 이해하기 위해서는 먼저 radix tree에 대한 이해가 필요합니다.
본 글을 radix tree에 대한 설명글이 아니므로, 간단하게만 설명하겠습니다.
#define radix_tree_root xarray
struct xarray {
spinlock_t xa_lock;
/* private: The rest of the data structure is not to be used directly. */
gfp_t xa_flags;
void __rcu * xa_head;
};
C
복사
radix_tree
특정 index(key)에 데이터(item)을 저장하는 자료구조이며, index를 이용해 item을 찾아가는 hash map과 비슷하지만 내부는 bit tree base로 작동한다.
실제로 void *xa_head를 설정해주는 부분을 보면 실 구조체를 볼 수 있습니다.
radix_tree_insert에서 radix tree 삽입 행위를 진행함으로 일단 인자를 먼저 보겠습니다.
int radix_tree_insert(struct radix_tree_root *root, unsigned long index,
void *item)
{
struct radix_tree_node *node;
void __rcu **slot;
int error;
BUG_ON(radix_tree_is_internal_node(item));
error = __radix_tree_create(root, index, &node, &slot);
if (error)
return error;
error = insert_entries(node, slot, item);
if (error < 0)
return error;
if (node) {
unsigned offset = get_slot_offset(node, slot);
BUG_ON(tag_get(node, 0, offset));
BUG_ON(tag_get(node, 1, offset));
BUG_ON(tag_get(node, 2, offset));
} else {
BUG_ON(root_tags_get(root));
}
return 0;
}
EXPORT_SYMBOL(radix_tree_insert);
C
복사
root는 위에서 본 radix tree의 최상단이 될 것이고 index는 64bit 정수이며, item은 저장할 데이터로 볼 수 있습니다.
코드를 간단하게 나눠보면
1.
__radix_tree_create
index로 접근할 tree에 삽입할 node 확보
2.
insert_entries
해당하는 node에 데이터 등록
3.
if(node)
무결성 검사
실제로 root tree에서 index까지 tree를 확장하는 __radix_tree_create 함수를 보겠습니다.
static int __radix_tree_create(struct radix_tree_root *root,
unsigned long index, struct radix_tree_node **nodep,
void __rcu ***slotp)
{
struct radix_tree_node *node = NULL, *child;
void __rcu **slot = (void __rcu **)&root->xa_head;
unsigned long maxindex;
unsigned int shift, offset = 0;
unsigned long max = index;
gfp_t gfp = root_gfp_mask(root);
shift = radix_tree_load_root(root, &child, &maxindex);
/* Make sure the tree is high enough. */
if (max > maxindex) {
int error = radix_tree_extend(root, gfp, max, shift);
if (error < 0)
return error;
shift = error;
child = rcu_dereference_raw(root->xa_head);
}
while (shift > 0) {
shift -= RADIX_TREE_MAP_SHIFT;
if (child == NULL) {
/* Have to add a child node. */
child = radix_tree_node_alloc(gfp, node, root, shift,
offset, 0, 0);
if (!child)
return -ENOMEM;
rcu_assign_pointer(*slot, node_to_entry(child));
if (node)
node->count++;
} else if (!radix_tree_is_internal_node(child))
break;
/* Go a level down */
node = entry_to_node(child);
offset = radix_tree_descend(node, &child, index);
slot = &node->slots[offset];
}
if (nodep)
*nodep = node;
if (slotp)
*slotp = slot;
return 0;
}
C
복사
조금 정리하고 일부분만 설명하면 아래의 코드와 동일합니다.
struct radix_tree_node *node = NULL, *child;
slot = (void __rcu **)&root->xa_head;
child = radix_tree_node_alloc(...);
rcu_assign_pointer(*slot, node_to_entry(child));
C
복사
rcu_assign_pointer 함수는 rcu 환경에서 write할 때 안정성을 보장하는 함수입니다.
즉 요약하면 아래의 코드와 동일합니다.
static inline void *node_to_entry(void *ptr)
{
return (void *)((unsigned long)ptr | RADIX_TREE_INTERNAL_NODE);
}
root->xa_head = node_to_entry(child);
C
복사
node_to_entry는 void *로 변환하는 함수이고, 실제로는 radix_tree_node_alloc의 return struct를 보면 xa_head의 구조체를 볼 수 있습니다.
radix_tree_node_alloc 함수의 분석은 하지 않고 radix_tree_node 구조체만 보겠습니다.
static struct radix_tree_node *
radix_tree_node_alloc(gfp_t gfp_mask, struct radix_tree_node *parent,
struct radix_tree_root *root,
unsigned int shift, unsigned int offset,
unsigned int count, unsigned int nr_values)
C
복사
정리해보면 xarray는 rcu를 지원하는 radix tree root를 위한 구조체로 보면 되며, xa_node는 rcu를 지원하는 radix tree node로 보면 됩니다.
#define radix_tree_node xa_node
struct xa_node {
unsigned char shift; /* Bits remaining in each slot */
unsigned char offset; /* Slot offset in parent */
unsigned char count; /* Total entry count */
unsigned char nr_values; /* Value entry count */
struct xa_node __rcu *parent; /* NULL at top of tree */
struct xarray *array; /* The array we belong to */
union {
struct list_head private_list; /* For tree user */
struct rcu_head rcu_head; /* Used when freeing node */
};
void __rcu *slots[XA_CHUNK_SIZE];
union {
unsigned long tags[XA_MAX_MARKS][XA_MARK_LONGS];
unsigned long marks[XA_MAX_MARKS][XA_MARK_LONGS];
};
};
C
복사
자 이제 이해한 내용을 바탕으로 radix_tree_lookup 함수부터 다시 분석을 진행하겠습니다.
__radix_tree_lookup
대략적인 분석만 해보겠습니다.
index를 기준으로 해당하는 radix tree에서 item(data)을 찾아서 반환합니다.
void *__radix_tree_lookup(const struct radix_tree_root *root,
unsigned long index, struct radix_tree_node **nodep,
void __rcu ***slotp)
{
struct radix_tree_node *node, *parent;
unsigned long maxindex;
void __rcu **slot;
restart:
parent = NULL;
slot = (void __rcu **)&root->xa_head;
radix_tree_load_root(root, &node, &maxindex);
if (index > maxindex)
return NULL;
while (radix_tree_is_internal_node(node)) {
unsigned offset;
parent = entry_to_node(node);
offset = radix_tree_descend(parent, &node, index);
slot = parent->slots + offset;
if (node == RADIX_TREE_RETRY)
goto restart;
if (parent->shift == 0)
break;
}
if (nodep)
*nodep = parent;
if (slotp)
*slotp = slot;
return node;
}
C
복사
여기서 root는 radix_tree_root 구조체인 idr_rt이며, idr_rt는 지금 프로세스의 ipc_ids[1]인 MSG IDS입니다.
idr_rt는 Message Queue의 radix tree이고, radix tree의 data로 Message Queue의 q_perm의 주소가 설정되어 있습니다.
msg_obtain_object_check
msg_obtain_object_check로 다시 돌아와서 보겠습니다.
ipc_obtain_object_check는 ipc에 등록된 radix tree에 msg id를 index로 msg_queue 구조체에 있는 kern_ipc_perm 구조체인 q_perm의 주소를 가져옵니다.
static inline struct msg_queue *msq_obtain_object_check(struct ipc_namespace *ns,
int id)
{
struct kern_ipc_perm *ipcp = ipc_obtain_object_check(&msg_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
return container_of(ipcp, struct msg_queue, q_perm);
}
C
복사
다음으로 container_of 매크로를 보면 type으로 casting한 뒤 member 변수의 위치만큼 offset을 뺍니다.
즉 구조체 포인터를 반환하는 함수로 보면 됩니다.
/**
* container_of - cast a member of a structure out to the containing structure
* @ptr: the pointer to the member.
* @type: the type of the container struct this is embedded in.
* @member: the name of the member within the struct.
*
* WARNING: any const qualifier of @ptr is lost.
*/
#define container_of(ptr, type, member) ({ \
void *__mptr = (void *)(ptr); \
static_assert(__same_type(*(ptr), ((type *)0)->member) || \
__same_type(*(ptr), void), \
"pointer type mismatch in container_of()"); \
((type *)(__mptr - offsetof(type, member))); })
/**
C
복사
msg_queue의 첫 멤버 변수가 q_perm임으로 실제로 msg_queue를 return 한다고 보면 됩니다.
struct msg_queue {
struct kern_ipc_perm q_perm;
time64_t q_stime; /* last msgsnd time */
time64_t q_rtime; /* last msgrcv time */
time64_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
struct pid *q_lspid; /* pid of last msgsnd */
struct pid *q_lrpid; /* last receive pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
} __randomize_layout;
C
복사
정리
msg_obtain_object_check 함수를 호출하면, ipc_obtain_object_check 함수로 현재 process의 ipc에 저장되어 있는 radix tree에 msg id를 index로 사용해 data로 저장되어 있는 msg_queue 구조체의 q_perm의 주소를 가져온 뒤 container_of 매크로를 통해서 msg_queue의 포인터를 가져옵니다.

