mirror of
https://gitlab.com/famedly/conduit.git
synced 2024-11-17 10:48:18 -07:00
Reduce turbofish usage
Should make the code a little bit easier to read.
This commit is contained in:
parent
1c4d9af586
commit
f2ef5677e0
@ -30,7 +30,7 @@ pub async fn set_global_account_data_route(
|
|||||||
) -> ConduitResult<set_global_account_data::Response> {
|
) -> ConduitResult<set_global_account_data::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let data = serde_json::from_str::<serde_json::Value>(body.data.get())
|
let data: serde_json::Value = serde_json::from_str(body.data.get())
|
||||||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
|
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
|
||||||
|
|
||||||
let event_type = body.event_type.to_string();
|
let event_type = body.event_type.to_string();
|
||||||
@ -68,7 +68,7 @@ pub async fn set_room_account_data_route(
|
|||||||
) -> ConduitResult<set_room_account_data::Response> {
|
) -> ConduitResult<set_room_account_data::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let data = serde_json::from_str::<serde_json::Value>(body.data.get())
|
let data: serde_json::Value = serde_json::from_str(body.data.get())
|
||||||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
|
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
|
||||||
|
|
||||||
let event_type = body.event_type.to_string();
|
let event_type = body.event_type.to_string();
|
||||||
@ -103,9 +103,9 @@ pub async fn get_global_account_data_route(
|
|||||||
) -> ConduitResult<get_global_account_data::Response> {
|
) -> ConduitResult<get_global_account_data::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let event = db
|
let event: Box<RawJsonValue> = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<Box<RawJsonValue>>(None, sender_user, body.event_type.clone().into())?
|
.get(None, sender_user, body.event_type.clone().into())?
|
||||||
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?;
|
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?;
|
||||||
|
|
||||||
let account_data = serde_json::from_str::<ExtractGlobalEventContent>(event.get())
|
let account_data = serde_json::from_str::<ExtractGlobalEventContent>(event.get())
|
||||||
@ -132,9 +132,9 @@ pub async fn get_room_account_data_route(
|
|||||||
) -> ConduitResult<get_room_account_data::Response> {
|
) -> ConduitResult<get_room_account_data::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let event = db
|
let event: Box<RawJsonValue> = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<Box<RawJsonValue>>(
|
.get(
|
||||||
Some(&body.room_id),
|
Some(&body.room_id),
|
||||||
sender_user,
|
sender_user,
|
||||||
body.event_type.clone().into(),
|
body.event_type.clone().into(),
|
||||||
|
@ -48,7 +48,7 @@ pub async fn get_context_route(
|
|||||||
))?
|
))?
|
||||||
.to_room_event();
|
.to_room_event();
|
||||||
|
|
||||||
let events_before = db
|
let events_before: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.pdus_until(sender_user, &body.room_id, base_token)?
|
.pdus_until(sender_user, &body.room_id, base_token)?
|
||||||
.take(
|
.take(
|
||||||
@ -58,19 +58,19 @@ pub async fn get_context_route(
|
|||||||
/ 2,
|
/ 2,
|
||||||
)
|
)
|
||||||
.filter_map(|r| r.ok()) // Remove buggy events
|
.filter_map(|r| r.ok()) // Remove buggy events
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let start_token = events_before
|
let start_token = events_before
|
||||||
.last()
|
.last()
|
||||||
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
|
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
|
||||||
.map(|count| count.to_string());
|
.map(|count| count.to_string());
|
||||||
|
|
||||||
let events_before = events_before
|
let events_before: Vec<_> = events_before
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, pdu)| pdu.to_room_event())
|
.map(|(_, pdu)| pdu.to_room_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let events_after = db
|
let events_after: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.pdus_after(sender_user, &body.room_id, base_token)?
|
.pdus_after(sender_user, &body.room_id, base_token)?
|
||||||
.take(
|
.take(
|
||||||
@ -80,17 +80,17 @@ pub async fn get_context_route(
|
|||||||
/ 2,
|
/ 2,
|
||||||
)
|
)
|
||||||
.filter_map(|r| r.ok()) // Remove buggy events
|
.filter_map(|r| r.ok()) // Remove buggy events
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let end_token = events_after
|
let end_token = events_after
|
||||||
.last()
|
.last()
|
||||||
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
|
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
|
||||||
.map(|count| count.to_string());
|
.map(|count| count.to_string());
|
||||||
|
|
||||||
let events_after = events_after
|
let events_after: Vec<_> = events_after
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, pdu)| pdu.to_room_event())
|
.map(|(_, pdu)| pdu.to_room_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let mut resp = get_context::Response::new();
|
let mut resp = get_context::Response::new();
|
||||||
resp.start = start_token;
|
resp.start = start_token;
|
||||||
|
@ -25,11 +25,11 @@ pub async fn get_devices_route(
|
|||||||
) -> ConduitResult<get_devices::Response> {
|
) -> ConduitResult<get_devices::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let devices = db
|
let devices: Vec<device::Device> = db
|
||||||
.users
|
.users
|
||||||
.all_devices_metadata(sender_user)
|
.all_devices_metadata(sender_user)
|
||||||
.filter_map(|r| r.ok()) // Filter out buggy devices
|
.filter_map(|r| r.ok()) // Filter out buggy devices
|
||||||
.collect::<Vec<device::Device>>();
|
.collect();
|
||||||
|
|
||||||
Ok(get_devices::Response { devices }.into())
|
Ok(get_devices::Response { devices }.into())
|
||||||
}
|
}
|
||||||
|
@ -223,7 +223,7 @@ pub(crate) async fn get_public_rooms_filtered_helper(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut all_rooms = db
|
let mut all_rooms: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.public_rooms()
|
.public_rooms()
|
||||||
.map(|room_id| {
|
.map(|room_id| {
|
||||||
@ -234,28 +234,22 @@ pub(crate) async fn get_public_rooms_filtered_helper(
|
|||||||
canonical_alias: db
|
canonical_alias: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomCanonicalAlias, "")?
|
.room_state_get(&room_id, &EventType::RoomCanonicalAlias, "")?
|
||||||
.map_or(Ok::<_, Error>(None), |s| {
|
.map_or(Ok(None), |s| {
|
||||||
Ok(
|
serde_json::from_str(s.content.get())
|
||||||
serde_json::from_str::<RoomCanonicalAliasEventContent>(s.content.get())
|
.map(|c: RoomCanonicalAliasEventContent| c.alias)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database("Invalid canonical alias event in database.")
|
||||||
"Invalid canonical alias event in database.",
|
})
|
||||||
)
|
|
||||||
})?
|
|
||||||
.alias,
|
|
||||||
)
|
|
||||||
})?,
|
})?,
|
||||||
name: db
|
name: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomName, "")?
|
.room_state_get(&room_id, &EventType::RoomName, "")?
|
||||||
.map_or(Ok::<_, Error>(None), |s| {
|
.map_or(Ok(None), |s| {
|
||||||
Ok(
|
serde_json::from_str(s.content.get())
|
||||||
serde_json::from_str::<RoomNameEventContent>(s.content.get())
|
.map(|c: RoomNameEventContent| c.name)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database("Invalid room name event in database.")
|
Error::bad_database("Invalid room name event in database.")
|
||||||
})?
|
})
|
||||||
.name,
|
|
||||||
)
|
|
||||||
})?,
|
})?,
|
||||||
num_joined_members: db
|
num_joined_members: db
|
||||||
.rooms
|
.rooms
|
||||||
@ -269,56 +263,48 @@ pub(crate) async fn get_public_rooms_filtered_helper(
|
|||||||
topic: db
|
topic: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomTopic, "")?
|
.room_state_get(&room_id, &EventType::RoomTopic, "")?
|
||||||
.map_or(Ok::<_, Error>(None), |s| {
|
.map_or(Ok(None), |s| {
|
||||||
Ok(Some(
|
serde_json::from_str(s.content.get())
|
||||||
serde_json::from_str::<RoomTopicEventContent>(s.content.get())
|
.map(|c: RoomTopicEventContent| Some(c.topic))
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database("Invalid room topic event in database.")
|
Error::bad_database("Invalid room topic event in database.")
|
||||||
})?
|
})
|
||||||
.topic,
|
|
||||||
))
|
|
||||||
})?,
|
})?,
|
||||||
world_readable: db
|
world_readable: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomHistoryVisibility, "")?
|
.room_state_get(&room_id, &EventType::RoomHistoryVisibility, "")?
|
||||||
.map_or(Ok::<_, Error>(false), |s| {
|
.map_or(Ok(false), |s| {
|
||||||
Ok(serde_json::from_str::<RoomHistoryVisibilityEventContent>(
|
serde_json::from_str(s.content.get())
|
||||||
s.content.get(),
|
.map(|c: RoomHistoryVisibilityEventContent| {
|
||||||
)
|
c.history_visibility == HistoryVisibility::WorldReadable
|
||||||
|
})
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"Invalid room history visibility event in database.",
|
"Invalid room history visibility event in database.",
|
||||||
)
|
)
|
||||||
})?
|
})
|
||||||
.history_visibility
|
|
||||||
== HistoryVisibility::WorldReadable)
|
|
||||||
})?,
|
})?,
|
||||||
guest_can_join: db
|
guest_can_join: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomGuestAccess, "")?
|
.room_state_get(&room_id, &EventType::RoomGuestAccess, "")?
|
||||||
.map_or(Ok::<_, Error>(false), |s| {
|
.map_or(Ok(false), |s| {
|
||||||
Ok(
|
serde_json::from_str(s.content.get())
|
||||||
serde_json::from_str::<RoomGuestAccessEventContent>(s.content.get())
|
.map(|c: RoomGuestAccessEventContent| {
|
||||||
|
c.guest_access == GuestAccess::CanJoin
|
||||||
|
})
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database("Invalid room guest access event in database.")
|
||||||
"Invalid room guest access event in database.",
|
})
|
||||||
)
|
|
||||||
})?
|
|
||||||
.guest_access
|
|
||||||
== GuestAccess::CanJoin,
|
|
||||||
)
|
|
||||||
})?,
|
})?,
|
||||||
avatar_url: db
|
avatar_url: db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&room_id, &EventType::RoomAvatar, "")?
|
.room_state_get(&room_id, &EventType::RoomAvatar, "")?
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
Ok::<_, Error>(
|
serde_json::from_str(s.content.get())
|
||||||
serde_json::from_str::<RoomAvatarEventContent>(s.content.get())
|
.map(|c: RoomAvatarEventContent| c.url)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database("Invalid room avatar event in database.")
|
Error::bad_database("Invalid room avatar event in database.")
|
||||||
})?
|
})
|
||||||
.url,
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.transpose()?
|
.transpose()?
|
||||||
// url is now an Option<String> so we must flatten
|
// url is now an Option<String> so we must flatten
|
||||||
@ -359,17 +345,17 @@ pub(crate) async fn get_public_rooms_filtered_helper(
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
// We need to collect all, so we can sort by member count
|
// We need to collect all, so we can sort by member count
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
all_rooms.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members));
|
all_rooms.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members));
|
||||||
|
|
||||||
let total_room_count_estimate = (all_rooms.len() as u32).into();
|
let total_room_count_estimate = (all_rooms.len() as u32).into();
|
||||||
|
|
||||||
let chunk = all_rooms
|
let chunk: Vec<_> = all_rooms
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.skip(num_since as usize)
|
.skip(num_since as usize)
|
||||||
.take(limit as usize)
|
.take(limit as usize)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let prev_batch = if num_since == 0 {
|
let prev_batch = if num_since == 0 {
|
||||||
None
|
None
|
||||||
|
@ -395,7 +395,7 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||||||
|
|
||||||
let mut failures = BTreeMap::new();
|
let mut failures = BTreeMap::new();
|
||||||
|
|
||||||
let mut futures = get_over_federation
|
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.map(|(server, vec)| async move {
|
||||||
let mut device_keys_input_fed = BTreeMap::new();
|
let mut device_keys_input_fed = BTreeMap::new();
|
||||||
@ -415,7 +415,7 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||||||
.await,
|
.await,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect();
|
||||||
|
|
||||||
while let Some((server, response)) = futures.next().await {
|
while let Some((server, response)) = futures.next().await {
|
||||||
match response {
|
match response {
|
||||||
|
@ -56,19 +56,17 @@ pub async fn join_room_by_id_route(
|
|||||||
) -> ConduitResult<join_room_by_id::Response> {
|
) -> ConduitResult<join_room_by_id::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let mut servers = db
|
let mut servers: HashSet<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.invite_state(sender_user, &body.room_id)?
|
.invite_state(sender_user, &body.room_id)?
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|event| {
|
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||||
serde_json::from_str::<serde_json::Value>(&event.json().to_string()).ok()
|
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||||
})
|
|
||||||
.filter_map(|event| event.get("sender").cloned())
|
|
||||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||||
.filter_map(|sender| UserId::try_from(sender).ok())
|
.filter_map(|sender| UserId::try_from(sender).ok())
|
||||||
.map(|user| user.server_name().to_owned())
|
.map(|user| user.server_name().to_owned())
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
servers.insert(body.room_id.server_name().to_owned());
|
servers.insert(body.room_id.server_name().to_owned());
|
||||||
|
|
||||||
@ -105,19 +103,17 @@ pub async fn join_room_by_id_or_alias_route(
|
|||||||
|
|
||||||
let (servers, room_id) = match RoomId::try_from(body.room_id_or_alias.clone()) {
|
let (servers, room_id) = match RoomId::try_from(body.room_id_or_alias.clone()) {
|
||||||
Ok(room_id) => {
|
Ok(room_id) => {
|
||||||
let mut servers = db
|
let mut servers: HashSet<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.invite_state(sender_user, &room_id)?
|
.invite_state(sender_user, &room_id)?
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|event| {
|
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||||
serde_json::from_str::<serde_json::Value>(&event.json().to_string()).ok()
|
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||||
})
|
|
||||||
.filter_map(|event| event.get("sender").cloned())
|
|
||||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||||
.filter_map(|sender| UserId::try_from(sender).ok())
|
.filter_map(|sender| UserId::try_from(sender).ok())
|
||||||
.map(|user| user.server_name().to_owned())
|
.map(|user| user.server_name().to_owned())
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
servers.insert(room_id.server_name().to_owned());
|
servers.insert(room_id.server_name().to_owned());
|
||||||
(servers, room_id)
|
(servers, room_id)
|
||||||
@ -280,7 +276,7 @@ pub async fn ban_user_route(
|
|||||||
&body.user_id.to_string(),
|
&body.user_id.to_string(),
|
||||||
)?
|
)?
|
||||||
.map_or(
|
.map_or(
|
||||||
Ok::<_, Error>(RoomMemberEventContent {
|
Ok(RoomMemberEventContent {
|
||||||
membership: MembershipState::Ban,
|
membership: MembershipState::Ban,
|
||||||
displayname: db.users.displayname(&body.user_id)?,
|
displayname: db.users.displayname(&body.user_id)?,
|
||||||
avatar_url: db.users.avatar_url(&body.user_id)?,
|
avatar_url: db.users.avatar_url(&body.user_id)?,
|
||||||
@ -290,10 +286,12 @@ pub async fn ban_user_route(
|
|||||||
reason: None,
|
reason: None,
|
||||||
}),
|
}),
|
||||||
|event| {
|
|event| {
|
||||||
let mut event = serde_json::from_str::<RoomMemberEventContent>(event.content.get())
|
serde_json::from_str(event.content.get())
|
||||||
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
|
.map(|event: RoomMemberEventContent| RoomMemberEventContent {
|
||||||
event.membership = MembershipState::Ban;
|
membership: MembershipState::Ban,
|
||||||
Ok(event)
|
..event
|
||||||
|
})
|
||||||
|
.map_err(|_| Error::bad_database("Invalid member event in database."))
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
@ -342,7 +340,7 @@ pub async fn unban_user_route(
|
|||||||
) -> ConduitResult<unban_user::Response> {
|
) -> ConduitResult<unban_user::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let mut event = serde_json::from_str::<RoomMemberEventContent>(
|
let mut event: RoomMemberEventContent = serde_json::from_str(
|
||||||
db.rooms
|
db.rooms
|
||||||
.room_state_get(
|
.room_state_get(
|
||||||
&body.room_id,
|
&body.room_id,
|
||||||
@ -577,10 +575,10 @@ async fn join_room_by_id_helper(
|
|||||||
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut join_event_stub =
|
let mut join_event_stub: CanonicalJsonObject =
|
||||||
serde_json::from_str::<CanonicalJsonObject>(make_join_response.event.get()).map_err(
|
serde_json::from_str(make_join_response.event.get()).map_err(|_| {
|
||||||
|_| Error::BadServerResponse("Invalid make_join event json received from server."),
|
Error::BadServerResponse("Invalid make_join event json received from server.")
|
||||||
)?;
|
})?;
|
||||||
|
|
||||||
// TODO: Is origin needed?
|
// TODO: Is origin needed?
|
||||||
join_event_stub.insert(
|
join_event_stub.insert(
|
||||||
@ -716,7 +714,7 @@ async fn join_room_by_id_helper(
|
|||||||
state
|
state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, id)| db.rooms.compress_state_event(k, &id, &db.globals))
|
.map(|(k, id)| db.rooms.compress_state_event(k, &id, &db.globals))
|
||||||
.collect::<Result<HashSet<_>>>()?,
|
.collect::<Result<_>>()?,
|
||||||
db,
|
db,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
@ -787,7 +785,7 @@ fn validate_and_add_event_id(
|
|||||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<(EventId, CanonicalJsonObject)> {
|
) -> Result<(EventId, CanonicalJsonObject)> {
|
||||||
let mut value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
|
let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
||||||
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
|
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
|
||||||
Error::BadServerResponse("Invalid PDU in server response")
|
Error::BadServerResponse("Invalid PDU in server response")
|
||||||
})?;
|
})?;
|
||||||
@ -863,22 +861,21 @@ pub(crate) async fn invite_helper<'a>(
|
|||||||
);
|
);
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
let prev_events = db
|
let prev_events: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.get_pdu_leaves(room_id)?
|
.get_pdu_leaves(room_id)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.take(20)
|
.take(20)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let create_event = db
|
let create_event = db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(room_id, &EventType::RoomCreate, "")?;
|
.room_state_get(room_id, &EventType::RoomCreate, "")?;
|
||||||
|
|
||||||
let create_event_content = create_event
|
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|create_event| {
|
.map(|create_event| {
|
||||||
serde_json::from_str::<RoomCreateEventContent>(create_event.content.get())
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
warn!("Invalid create event: {}", e);
|
||||||
Error::bad_database("Invalid create event in db.")
|
Error::bad_database("Invalid create event in db.")
|
||||||
})
|
})
|
||||||
@ -1057,7 +1054,7 @@ pub(crate) async fn invite_helper<'a>(
|
|||||||
warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value);
|
warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
let origin = serde_json::from_value::<Box<ServerName>>(
|
let origin: Box<ServerName> = serde_json::from_value(
|
||||||
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
|
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
"Event needs an origin field.",
|
"Event needs an origin field.",
|
||||||
|
@ -132,14 +132,11 @@ pub async fn get_message_events_route(
|
|||||||
let to = body.to.as_ref().map(|t| t.parse());
|
let to = body.to.as_ref().map(|t| t.parse());
|
||||||
|
|
||||||
// Use limit or else 10
|
// Use limit or else 10
|
||||||
let limit = body
|
let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize);
|
||||||
.limit
|
|
||||||
.try_into()
|
|
||||||
.map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?;
|
|
||||||
|
|
||||||
match body.dir {
|
match body.dir {
|
||||||
get_message_events::Direction::Forward => {
|
get_message_events::Direction::Forward => {
|
||||||
let events_after = db
|
let events_after: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.pdus_after(sender_user, &body.room_id, from)?
|
.pdus_after(sender_user, &body.room_id, from)?
|
||||||
.take(limit)
|
.take(limit)
|
||||||
@ -151,14 +148,14 @@ pub async fn get_message_events_route(
|
|||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
|
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let end_token = events_after.last().map(|(count, _)| count.to_string());
|
let end_token = events_after.last().map(|(count, _)| count.to_string());
|
||||||
|
|
||||||
let events_after = events_after
|
let events_after: Vec<_> = events_after
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, pdu)| pdu.to_room_event())
|
.map(|(_, pdu)| pdu.to_room_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let mut resp = get_message_events::Response::new();
|
let mut resp = get_message_events::Response::new();
|
||||||
resp.start = Some(body.from.to_owned());
|
resp.start = Some(body.from.to_owned());
|
||||||
@ -169,7 +166,7 @@ pub async fn get_message_events_route(
|
|||||||
Ok(resp.into())
|
Ok(resp.into())
|
||||||
}
|
}
|
||||||
get_message_events::Direction::Backward => {
|
get_message_events::Direction::Backward => {
|
||||||
let events_before = db
|
let events_before: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.pdus_until(sender_user, &body.room_id, from)?
|
.pdus_until(sender_user, &body.room_id, from)?
|
||||||
.take(limit)
|
.take(limit)
|
||||||
@ -181,14 +178,14 @@ pub async fn get_message_events_route(
|
|||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
|
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let start_token = events_before.last().map(|(count, _)| count.to_string());
|
let start_token = events_before.last().map(|(count, _)| count.to_string());
|
||||||
|
|
||||||
let events_before = events_before
|
let events_before: Vec<_> = events_before
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, pdu)| pdu.to_room_event())
|
.map(|(_, pdu)| pdu.to_room_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let mut resp = get_message_events::Response::new();
|
let mut resp = get_message_events::Response::new();
|
||||||
resp.start = Some(body.from.to_owned());
|
resp.start = Some(body.from.to_owned());
|
||||||
|
@ -8,7 +8,7 @@ use ruma::{
|
|||||||
set_pushrule_enabled, RuleKind,
|
set_pushrule_enabled, RuleKind,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
events::{push_rules, EventType},
|
events::{push_rules::PushRulesEvent, EventType},
|
||||||
push::{ConditionalPushRuleInit, PatternedPushRuleInit, SimplePushRuleInit},
|
push::{ConditionalPushRuleInit, PatternedPushRuleInit, SimplePushRuleInit},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -29,9 +29,9 @@ pub async fn get_pushrules_all_route(
|
|||||||
) -> ConduitResult<get_pushrules_all::Response> {
|
) -> ConduitResult<get_pushrules_all::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let event = db
|
let event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -57,9 +57,9 @@ pub async fn get_pushrule_route(
|
|||||||
) -> ConduitResult<get_pushrule::Response> {
|
) -> ConduitResult<get_pushrule::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
let event = db
|
let event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -122,9 +122,9 @@ pub async fn set_pushrule_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -222,9 +222,9 @@ pub async fn get_pushrule_actions_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -284,9 +284,9 @@ pub async fn set_pushrule_actions_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -356,9 +356,9 @@ pub async fn get_pushrule_enabled_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -420,9 +420,9 @@ pub async fn set_pushrule_enabled_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<ruma::events::push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
@ -497,9 +497,9 @@ pub async fn delete_pushrule_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut event = db
|
let mut event: PushRulesEvent = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, sender_user, EventType::PushRules)?
|
.get(None, sender_user, EventType::PushRules)?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"PushRules event not found.",
|
"PushRules event not found.",
|
||||||
|
@ -22,6 +22,7 @@ use ruma::{
|
|||||||
},
|
},
|
||||||
EventType,
|
EventType,
|
||||||
},
|
},
|
||||||
|
serde::JsonObject,
|
||||||
RoomAliasId, RoomId, RoomVersionId,
|
RoomAliasId, RoomId, RoomVersionId,
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
@ -175,9 +176,7 @@ pub async fn create_room_route(
|
|||||||
.expect("event is valid, we just created it");
|
.expect("event is valid, we just created it");
|
||||||
|
|
||||||
if let Some(power_level_content_override) = &body.power_level_content_override {
|
if let Some(power_level_content_override) = &body.power_level_content_override {
|
||||||
let json = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(
|
let json: JsonObject = serde_json::from_str(power_level_content_override.json().get())
|
||||||
power_level_content_override.json().get(),
|
|
||||||
)
|
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.")
|
Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.")
|
||||||
})?;
|
})?;
|
||||||
@ -605,7 +604,7 @@ pub async fn upgrade_room_route(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the old room power levels
|
// Get the old room power levels
|
||||||
let mut power_levels_event_content = serde_json::from_str::<RoomPowerLevelsEventContent>(
|
let mut power_levels_event_content: RoomPowerLevelsEventContent = serde_json::from_str(
|
||||||
db.rooms
|
db.rooms
|
||||||
.room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")?
|
.room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")?
|
||||||
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
|
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
|
||||||
|
@ -74,7 +74,7 @@ pub async fn search_events_route(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let results = results
|
let results: Vec<_> = results
|
||||||
.iter()
|
.iter()
|
||||||
.map(|result| {
|
.map(|result| {
|
||||||
Ok::<_, Error>(SearchResult {
|
Ok::<_, Error>(SearchResult {
|
||||||
@ -95,7 +95,7 @@ pub async fn search_events_route(
|
|||||||
.filter_map(|r| r.ok())
|
.filter_map(|r| r.ok())
|
||||||
.skip(skip)
|
.skip(skip)
|
||||||
.take(limit)
|
.take(limit)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let next_batch = if results.len() < limit as usize {
|
let next_batch = if results.len() < limit as usize {
|
||||||
None
|
None
|
||||||
@ -114,7 +114,7 @@ pub async fn search_events_route(
|
|||||||
.search_term
|
.search_term
|
||||||
.split_terminator(|c: char| !c.is_alphanumeric())
|
.split_terminator(|c: char| !c.is_alphanumeric())
|
||||||
.map(str::to_lowercase)
|
.map(str::to_lowercase)
|
||||||
.collect::<Vec<_>>(),
|
.collect(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
.into())
|
.into())
|
||||||
|
@ -112,13 +112,13 @@ pub async fn get_state_events_route(
|
|||||||
db.rooms
|
db.rooms
|
||||||
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
||||||
.map(|event| {
|
.map(|event| {
|
||||||
serde_json::from_str::<RoomHistoryVisibilityEventContent>(event.content.get())
|
serde_json::from_str(event.content.get())
|
||||||
|
.map(|e: RoomHistoryVisibilityEventContent| e.history_visibility)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"Invalid room history visibility event in database.",
|
"Invalid room history visibility event in database.",
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map(|e| e.history_visibility)
|
|
||||||
}),
|
}),
|
||||||
Some(Ok(HistoryVisibility::WorldReadable))
|
Some(Ok(HistoryVisibility::WorldReadable))
|
||||||
)
|
)
|
||||||
@ -164,13 +164,13 @@ pub async fn get_state_events_for_key_route(
|
|||||||
db.rooms
|
db.rooms
|
||||||
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
||||||
.map(|event| {
|
.map(|event| {
|
||||||
serde_json::from_str::<RoomHistoryVisibilityEventContent>(event.content.get())
|
serde_json::from_str(event.content.get())
|
||||||
|
.map(|e: RoomHistoryVisibilityEventContent| e.history_visibility)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"Invalid room history visibility event in database.",
|
"Invalid room history visibility event in database.",
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map(|e| e.history_visibility)
|
|
||||||
}),
|
}),
|
||||||
Some(Ok(HistoryVisibility::WorldReadable))
|
Some(Ok(HistoryVisibility::WorldReadable))
|
||||||
)
|
)
|
||||||
@ -220,13 +220,13 @@ pub async fn get_state_events_for_empty_key_route(
|
|||||||
db.rooms
|
db.rooms
|
||||||
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
|
||||||
.map(|event| {
|
.map(|event| {
|
||||||
serde_json::from_str::<RoomHistoryVisibilityEventContent>(event.content.get())
|
serde_json::from_str(event.content.get())
|
||||||
|
.map(|e: RoomHistoryVisibilityEventContent| e.history_visibility)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"Invalid room history visibility event in database.",
|
"Invalid room history visibility event in database.",
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map(|e| e.history_visibility)
|
|
||||||
}),
|
}),
|
||||||
Some(Ok(HistoryVisibility::WorldReadable))
|
Some(Ok(HistoryVisibility::WorldReadable))
|
||||||
)
|
)
|
||||||
|
@ -244,13 +244,13 @@ async fn sync_helper(
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Take the last 10 events for the timeline
|
// Take the last 10 events for the timeline
|
||||||
let timeline_pdus = non_timeline_pdus
|
let timeline_pdus: Vec<_> = non_timeline_pdus
|
||||||
.by_ref()
|
.by_ref()
|
||||||
.take(10)
|
.take(10)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.rev()
|
.rev()
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let send_notification_counts = !timeline_pdus.is_empty()
|
let send_notification_counts = !timeline_pdus.is_empty()
|
||||||
|| db
|
|| db
|
||||||
@ -290,9 +290,8 @@ async fn sync_helper(
|
|||||||
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
|
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
|
||||||
.filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
|
.filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
|
||||||
.map(|(_, pdu)| {
|
.map(|(_, pdu)| {
|
||||||
let content =
|
let content: RoomMemberEventContent =
|
||||||
serde_json::from_str::<RoomMemberEventContent>(pdu.content.get())
|
serde_json::from_str(pdu.content.get()).map_err(|_| {
|
||||||
.map_err(|_| {
|
|
||||||
Error::bad_database("Invalid member event in database.")
|
Error::bad_database("Invalid member event in database.")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@ -347,11 +346,11 @@ async fn sync_helper(
|
|||||||
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
|
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
|
||||||
|
|
||||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
||||||
let state_events = current_state_ids
|
let state_events: Vec<_> = current_state_ids
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, id)| db.rooms.get_pdu(id))
|
.map(|(_, id)| db.rooms.get_pdu(id))
|
||||||
.filter_map(|r| r.ok().flatten())
|
.filter_map(|r| r.ok().flatten())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
(
|
(
|
||||||
heroes,
|
heroes,
|
||||||
@ -367,7 +366,7 @@ async fn sync_helper(
|
|||||||
// Incremental /sync
|
// Incremental /sync
|
||||||
let since_shortstatehash = since_shortstatehash.unwrap();
|
let since_shortstatehash = since_shortstatehash.unwrap();
|
||||||
|
|
||||||
let since_sender_member = db
|
let since_sender_member: Option<RoomMemberEventContent> = db
|
||||||
.rooms
|
.rooms
|
||||||
.state_get(
|
.state_get(
|
||||||
since_shortstatehash,
|
since_shortstatehash,
|
||||||
@ -375,7 +374,7 @@ async fn sync_helper(
|
|||||||
sender_user.as_str(),
|
sender_user.as_str(),
|
||||||
)?
|
)?
|
||||||
.and_then(|pdu| {
|
.and_then(|pdu| {
|
||||||
serde_json::from_str::<RoomMemberEventContent>(pdu.content.get())
|
serde_json::from_str(pdu.content.get())
|
||||||
.map_err(|_| Error::bad_database("Invalid PDU in database."))
|
.map_err(|_| Error::bad_database("Invalid PDU in database."))
|
||||||
.ok()
|
.ok()
|
||||||
});
|
});
|
||||||
@ -523,18 +522,18 @@ async fn sync_helper(
|
|||||||
Ok(Some(db.rooms.pdu_count(pdu_id)?.to_string()))
|
Ok(Some(db.rooms.pdu_count(pdu_id)?.to_string()))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let room_events = timeline_pdus
|
let room_events: Vec<_> = timeline_pdus
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, pdu)| pdu.to_sync_room_event())
|
.map(|(_, pdu)| pdu.to_sync_room_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let mut edus = db
|
let mut edus: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.edus
|
.edus
|
||||||
.readreceipts_since(&room_id, since)
|
.readreceipts_since(&room_id, since)
|
||||||
.filter_map(|r| r.ok()) // Filter out buggy events
|
.filter_map(|r| r.ok()) // Filter out buggy events
|
||||||
.map(|(_, _, v)| v)
|
.map(|(_, _, v)| v)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since {
|
if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since {
|
||||||
edus.push(
|
edus.push(
|
||||||
@ -563,7 +562,7 @@ async fn sync_helper(
|
|||||||
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>(),
|
.collect(),
|
||||||
},
|
},
|
||||||
summary: sync_events::RoomSummary {
|
summary: sync_events::RoomSummary {
|
||||||
heroes,
|
heroes,
|
||||||
@ -628,7 +627,7 @@ async fn sync_helper(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut left_rooms = BTreeMap::new();
|
let mut left_rooms = BTreeMap::new();
|
||||||
let all_left_rooms = db.rooms.rooms_left(&sender_user).collect::<Vec<_>>();
|
let all_left_rooms: Vec<_> = db.rooms.rooms_left(&sender_user).collect();
|
||||||
for result in all_left_rooms {
|
for result in all_left_rooms {
|
||||||
let (room_id, left_state_events) = result?;
|
let (room_id, left_state_events) = result?;
|
||||||
|
|
||||||
@ -668,7 +667,7 @@ async fn sync_helper(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut invited_rooms = BTreeMap::new();
|
let mut invited_rooms = BTreeMap::new();
|
||||||
let all_invited_rooms = db.rooms.rooms_invited(&sender_user).collect::<Vec<_>>();
|
let all_invited_rooms: Vec<_> = db.rooms.rooms_invited(&sender_user).collect();
|
||||||
for result in all_invited_rooms {
|
for result in all_invited_rooms {
|
||||||
let (room_id, invite_state_events) = result?;
|
let (room_id, invite_state_events) = result?;
|
||||||
|
|
||||||
@ -750,7 +749,7 @@ async fn sync_helper(
|
|||||||
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>(),
|
.collect(),
|
||||||
},
|
},
|
||||||
device_lists: sync_events::DeviceLists {
|
device_lists: sync_events::DeviceLists {
|
||||||
changed: device_list_updates.into_iter().collect(),
|
changed: device_list_updates.into_iter().collect(),
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
use crate::{database::DatabaseGuard, ConduitResult, Ruma};
|
use crate::{database::DatabaseGuard, ConduitResult, Ruma};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::r0::tag::{create_tag, delete_tag, get_tags},
|
api::client::r0::tag::{create_tag, delete_tag, get_tags},
|
||||||
events::EventType,
|
events::{
|
||||||
|
tag::{TagEvent, TagEventContent},
|
||||||
|
EventType,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
@ -26,9 +29,9 @@ pub async fn update_tag_route(
|
|||||||
|
|
||||||
let mut tags_event = db
|
let mut tags_event = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<ruma::events::tag::TagEvent>(Some(&body.room_id), sender_user, EventType::Tag)?
|
.get(Some(&body.room_id), sender_user, EventType::Tag)?
|
||||||
.unwrap_or_else(|| ruma::events::tag::TagEvent {
|
.unwrap_or_else(|| TagEvent {
|
||||||
content: ruma::events::tag::TagEventContent {
|
content: TagEventContent {
|
||||||
tags: BTreeMap::new(),
|
tags: BTreeMap::new(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
@ -68,9 +71,9 @@ pub async fn delete_tag_route(
|
|||||||
|
|
||||||
let mut tags_event = db
|
let mut tags_event = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<ruma::events::tag::TagEvent>(Some(&body.room_id), sender_user, EventType::Tag)?
|
.get(Some(&body.room_id), sender_user, EventType::Tag)?
|
||||||
.unwrap_or_else(|| ruma::events::tag::TagEvent {
|
.unwrap_or_else(|| TagEvent {
|
||||||
content: ruma::events::tag::TagEventContent {
|
content: TagEventContent {
|
||||||
tags: BTreeMap::new(),
|
tags: BTreeMap::new(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
@ -108,9 +111,9 @@ pub async fn get_tags_route(
|
|||||||
Ok(get_tags::Response {
|
Ok(get_tags::Response {
|
||||||
tags: db
|
tags: db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<ruma::events::tag::TagEvent>(Some(&body.room_id), sender_user, EventType::Tag)?
|
.get(Some(&body.room_id), sender_user, EventType::Tag)?
|
||||||
.unwrap_or_else(|| ruma::events::tag::TagEvent {
|
.unwrap_or_else(|| TagEvent {
|
||||||
content: ruma::events::tag::TagEventContent {
|
content: TagEventContent {
|
||||||
tags: BTreeMap::new(),
|
tags: BTreeMap::new(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -699,7 +699,7 @@ impl Database {
|
|||||||
|
|
||||||
println!("Deleting starts");
|
println!("Deleting starts");
|
||||||
|
|
||||||
let batch2 = db
|
let batch2: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.tokenids
|
.tokenids
|
||||||
.iter()
|
.iter()
|
||||||
@ -711,7 +711,7 @@ impl Database {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
for key in batch2 {
|
for key in batch2 {
|
||||||
println!("del");
|
println!("del");
|
||||||
|
@ -57,8 +57,7 @@ pub struct RotationHandler(broadcast::Sender<()>, broadcast::Receiver<()>);
|
|||||||
|
|
||||||
impl RotationHandler {
|
impl RotationHandler {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let (s, r) = broadcast::channel::<()>(1);
|
let (s, r) = broadcast::channel(1);
|
||||||
|
|
||||||
Self(s, r)
|
Self(s, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,8 +273,8 @@ impl Globals {
|
|||||||
let signingkeys = self
|
let signingkeys = self
|
||||||
.server_signingkeys
|
.server_signingkeys
|
||||||
.get(origin.as_bytes())?
|
.get(origin.as_bytes())?
|
||||||
.and_then(|bytes| serde_json::from_slice::<ServerSigningKeys>(&bytes).ok())
|
.and_then(|bytes| serde_json::from_slice(&bytes).ok())
|
||||||
.map(|keys| {
|
.map(|keys: ServerSigningKeys| {
|
||||||
let mut tree = keys.verify_keys;
|
let mut tree = keys.verify_keys;
|
||||||
tree.extend(
|
tree.extend(
|
||||||
keys.old_verify_keys
|
keys.old_verify_keys
|
||||||
|
@ -94,15 +94,15 @@ impl KeyBackups {
|
|||||||
.iter_from(&last_possible_key, true)
|
.iter_from(&last_possible_key, true)
|
||||||
.take_while(move |(k, _)| k.starts_with(&prefix))
|
.take_while(move |(k, _)| k.starts_with(&prefix))
|
||||||
.next()
|
.next()
|
||||||
.map_or(Ok(None), |(key, _)| {
|
.map(|(key, _)| {
|
||||||
utils::string_from_bytes(
|
utils::string_from_bytes(
|
||||||
key.rsplit(|&b| b == 0xff)
|
key.rsplit(|&b| b == 0xff)
|
||||||
.next()
|
.next()
|
||||||
.expect("rsplit always returns an element"),
|
.expect("rsplit always returns an element"),
|
||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))
|
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))
|
||||||
.map(Some)
|
|
||||||
})
|
})
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_latest_backup(&self, user_id: &UserId) -> Result<Option<(String, BackupAlgorithm)>> {
|
pub fn get_latest_backup(&self, user_id: &UserId) -> Result<Option<(String, BackupAlgorithm)>> {
|
||||||
@ -115,7 +115,7 @@ impl KeyBackups {
|
|||||||
.iter_from(&last_possible_key, true)
|
.iter_from(&last_possible_key, true)
|
||||||
.take_while(move |(k, _)| k.starts_with(&prefix))
|
.take_while(move |(k, _)| k.starts_with(&prefix))
|
||||||
.next()
|
.next()
|
||||||
.map_or(Ok(None), |(key, value)| {
|
.map(|(key, value)| {
|
||||||
let version = utils::string_from_bytes(
|
let version = utils::string_from_bytes(
|
||||||
key.rsplit(|&b| b == 0xff)
|
key.rsplit(|&b| b == 0xff)
|
||||||
.next()
|
.next()
|
||||||
@ -123,13 +123,14 @@ impl KeyBackups {
|
|||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?;
|
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?;
|
||||||
|
|
||||||
Ok(Some((
|
Ok((
|
||||||
version,
|
version,
|
||||||
serde_json::from_slice(&value).map_err(|_| {
|
serde_json::from_slice(&value).map_err(|_| {
|
||||||
Error::bad_database("Algorithm in backupid_algorithm is invalid.")
|
Error::bad_database("Algorithm in backupid_algorithm is invalid.")
|
||||||
})?,
|
})?,
|
||||||
)))
|
))
|
||||||
})
|
})
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<BackupAlgorithm>> {
|
pub fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<BackupAlgorithm>> {
|
||||||
|
@ -13,13 +13,16 @@ use rocket::http::RawStr;
|
|||||||
use ruma::{
|
use ruma::{
|
||||||
api::{client::error::ErrorKind, federation},
|
api::{client::error::ErrorKind, federation},
|
||||||
events::{
|
events::{
|
||||||
ignored_user_list, push_rules,
|
direct::DirectEvent,
|
||||||
|
ignored_user_list::IgnoredUserListEvent,
|
||||||
|
push_rules::PushRulesEvent,
|
||||||
room::{
|
room::{
|
||||||
create::RoomCreateEventContent,
|
create::RoomCreateEventContent,
|
||||||
member::{MembershipState, RoomMemberEventContent},
|
member::{MembershipState, RoomMemberEventContent},
|
||||||
message::RoomMessageEventContent,
|
message::RoomMessageEventContent,
|
||||||
power_levels::RoomPowerLevelsEventContent,
|
power_levels::RoomPowerLevelsEventContent,
|
||||||
},
|
},
|
||||||
|
tag::TagEvent,
|
||||||
AnyStrippedStateEvent, AnySyncStateEvent, EventType,
|
AnyStrippedStateEvent, AnySyncStateEvent, EventType,
|
||||||
},
|
},
|
||||||
push::{Action, Ruleset, Tweak},
|
push::{Action, Ruleset, Tweak},
|
||||||
@ -218,16 +221,16 @@ impl Rooms {
|
|||||||
self.eventid_shorteventid
|
self.eventid_shorteventid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or(Ok(None), |shorteventid| {
|
.map_or(Ok(None), |shorteventid| {
|
||||||
self.shorteventid_shortstatehash.get(&shorteventid)?.map_or(
|
self.shorteventid_shortstatehash
|
||||||
Ok::<_, Error>(None),
|
.get(&shorteventid)?
|
||||||
|bytes| {
|
.map(|bytes| {
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
||||||
)
|
)
|
||||||
})?))
|
})
|
||||||
},
|
})
|
||||||
)
|
.transpose()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,16 +372,16 @@ impl Rooms {
|
|||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
||||||
{
|
{
|
||||||
let statediffnew = new_state_ids_compressed
|
let statediffnew: HashSet<_> = new_state_ids_compressed
|
||||||
.difference(&parent_stateinfo.1)
|
.difference(&parent_stateinfo.1)
|
||||||
.copied()
|
.copied()
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
let statediffremoved = parent_stateinfo
|
let statediffremoved: HashSet<_> = parent_stateinfo
|
||||||
.1
|
.1
|
||||||
.difference(&new_state_ids_compressed)
|
.difference(&new_state_ids_compressed)
|
||||||
.copied()
|
.copied()
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
(statediffnew, statediffremoved)
|
||||||
} else {
|
} else {
|
||||||
@ -409,7 +412,7 @@ impl Rooms {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let pdu = match serde_json::from_str::<PduEvent>(
|
let pdu: PduEvent = match serde_json::from_str(
|
||||||
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
|
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
|
||||||
) {
|
) {
|
||||||
Ok(pdu) => pdu,
|
Ok(pdu) => pdu,
|
||||||
@ -980,7 +983,8 @@ impl Rooms {
|
|||||||
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
|
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
|
||||||
self.eventid_pduid
|
self.eventid_pduid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some))
|
.map(|pdu_id| self.pdu_count(&pdu_id))
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
@ -1008,7 +1012,7 @@ impl Rooms {
|
|||||||
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
||||||
self.eventid_pduid
|
self.eventid_pduid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or_else::<Result<_>, _, _>(
|
.map_or_else(
|
||||||
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|
||||||
|pduid| {
|
|pduid| {
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
||||||
@ -1041,14 +1045,12 @@ impl Rooms {
|
|||||||
) -> Result<Option<CanonicalJsonObject>> {
|
) -> Result<Option<CanonicalJsonObject>> {
|
||||||
self.eventid_pduid
|
self.eventid_pduid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or_else::<Result<_>, _, _>(
|
.map(|pduid| {
|
||||||
|| Ok(None),
|
self.pduid_pdu
|
||||||
|pduid| {
|
.get(&pduid)?
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
||||||
Error::bad_database("Invalid pduid in eventid_pduid.")
|
})
|
||||||
})?))
|
.transpose()?
|
||||||
},
|
|
||||||
)?
|
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
})
|
})
|
||||||
@ -1058,9 +1060,7 @@ impl Rooms {
|
|||||||
/// Returns the pdu's id.
|
/// Returns the pdu's id.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
|
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
|
||||||
self.eventid_pduid
|
self.eventid_pduid.get(event_id.as_bytes())
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |pdu_id| Ok(Some(pdu_id)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu.
|
/// Returns the pdu.
|
||||||
@ -1070,14 +1070,12 @@ impl Rooms {
|
|||||||
pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
||||||
self.eventid_pduid
|
self.eventid_pduid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or_else::<Result<_>, _, _>(
|
.map(|pduid| {
|
||||||
|| Ok(None),
|
self.pduid_pdu
|
||||||
|pduid| {
|
.get(&pduid)?
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
||||||
Error::bad_database("Invalid pduid in eventid_pduid.")
|
})
|
||||||
})?))
|
.transpose()?
|
||||||
},
|
|
||||||
)?
|
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
})
|
})
|
||||||
@ -1096,11 +1094,8 @@ impl Rooms {
|
|||||||
if let Some(pdu) = self
|
if let Some(pdu) = self
|
||||||
.eventid_pduid
|
.eventid_pduid
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or_else::<Result<_>, _, _>(
|
.map_or_else(
|
||||||
|| {
|
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|
||||||
let r = self.eventid_outlierpdu.get(event_id.as_bytes());
|
|
||||||
r
|
|
||||||
},
|
|
||||||
|pduid| {
|
|pduid| {
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
||||||
Error::bad_database("Invalid pduid in eventid_pduid.")
|
Error::bad_database("Invalid pduid in eventid_pduid.")
|
||||||
@ -1363,8 +1358,8 @@ impl Rooms {
|
|||||||
|
|
||||||
let rules_for_user = db
|
let rules_for_user = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, user, EventType::PushRules)?
|
.get(None, user, EventType::PushRules)?
|
||||||
.map(|ev| ev.content.global)
|
.map(|ev: PushRulesEvent| ev.content.global)
|
||||||
.unwrap_or_else(|| Ruleset::server_default(user));
|
.unwrap_or_else(|| Ruleset::server_default(user));
|
||||||
|
|
||||||
let mut highlight = false;
|
let mut highlight = false;
|
||||||
@ -1490,11 +1485,11 @@ impl Rooms {
|
|||||||
{
|
{
|
||||||
let mut lines = body.lines();
|
let mut lines = body.lines();
|
||||||
let command_line = lines.next().expect("each string has at least one line");
|
let command_line = lines.next().expect("each string has at least one line");
|
||||||
let body = lines.collect::<Vec<_>>();
|
let body: Vec<_> = lines.collect();
|
||||||
|
|
||||||
let mut parts = command_line.split_whitespace().skip(1);
|
let mut parts = command_line.split_whitespace().skip(1);
|
||||||
if let Some(command) = parts.next() {
|
if let Some(command) = parts.next() {
|
||||||
let args = parts.collect::<Vec<_>>();
|
let args: Vec<_> = parts.collect();
|
||||||
|
|
||||||
match command {
|
match command {
|
||||||
"register_appservice" => {
|
"register_appservice" => {
|
||||||
@ -1771,16 +1766,16 @@ impl Rooms {
|
|||||||
|
|
||||||
let (statediffnew, statediffremoved) =
|
let (statediffnew, statediffremoved) =
|
||||||
if let Some(parent_stateinfo) = states_parents.last() {
|
if let Some(parent_stateinfo) = states_parents.last() {
|
||||||
let statediffnew = state_ids_compressed
|
let statediffnew: HashSet<_> = state_ids_compressed
|
||||||
.difference(&parent_stateinfo.1)
|
.difference(&parent_stateinfo.1)
|
||||||
.copied()
|
.copied()
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
let statediffremoved = parent_stateinfo
|
let statediffremoved: HashSet<_> = parent_stateinfo
|
||||||
.1
|
.1
|
||||||
.difference(&state_ids_compressed)
|
.difference(&state_ids_compressed)
|
||||||
.copied()
|
.copied()
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
(statediffnew, statediffremoved)
|
||||||
} else {
|
} else {
|
||||||
@ -2363,19 +2358,16 @@ impl Rooms {
|
|||||||
// Check if the room has a predecessor
|
// Check if the room has a predecessor
|
||||||
if let Some(predecessor) = self
|
if let Some(predecessor) = self
|
||||||
.room_state_get(room_id, &EventType::RoomCreate, "")?
|
.room_state_get(room_id, &EventType::RoomCreate, "")?
|
||||||
.and_then(|create| {
|
.and_then(|create| serde_json::from_str(create.content.get()).ok())
|
||||||
serde_json::from_str::<RoomCreateEventContent>(create.content.get())
|
.and_then(|content: RoomCreateEventContent| content.predecessor)
|
||||||
.ok()
|
|
||||||
})
|
|
||||||
.and_then(|content| content.predecessor)
|
|
||||||
{
|
{
|
||||||
// Copy user settings from predecessor to the current room:
|
// Copy user settings from predecessor to the current room:
|
||||||
// - Push rules
|
// - Push rules
|
||||||
//
|
//
|
||||||
// TODO: finish this once push rules are implemented.
|
// TODO: finish this once push rules are implemented.
|
||||||
//
|
//
|
||||||
// let mut push_rules_event_content = account_data
|
// let mut push_rules_event_content: PushRulesEvent = account_data
|
||||||
// .get::<ruma::events::push_rules::PushRulesEvent>(
|
// .get(
|
||||||
// None,
|
// None,
|
||||||
// user_id,
|
// user_id,
|
||||||
// EventType::PushRules,
|
// EventType::PushRules,
|
||||||
@ -2395,13 +2387,11 @@ impl Rooms {
|
|||||||
// .ok();
|
// .ok();
|
||||||
|
|
||||||
// Copy old tags to new room
|
// Copy old tags to new room
|
||||||
if let Some(tag_event) =
|
if let Some(tag_event) = db.account_data.get::<TagEvent>(
|
||||||
db.account_data.get::<ruma::events::tag::TagEvent>(
|
|
||||||
Some(&predecessor.room_id),
|
Some(&predecessor.room_id),
|
||||||
user_id,
|
user_id,
|
||||||
EventType::Tag,
|
EventType::Tag,
|
||||||
)?
|
)? {
|
||||||
{
|
|
||||||
db.account_data
|
db.account_data
|
||||||
.update(
|
.update(
|
||||||
Some(room_id),
|
Some(room_id),
|
||||||
@ -2415,11 +2405,8 @@ impl Rooms {
|
|||||||
|
|
||||||
// Copy direct chat flag
|
// Copy direct chat flag
|
||||||
if let Some(mut direct_event) =
|
if let Some(mut direct_event) =
|
||||||
db.account_data.get::<ruma::events::direct::DirectEvent>(
|
db.account_data
|
||||||
None,
|
.get::<DirectEvent>(None, user_id, EventType::Direct)?
|
||||||
user_id,
|
|
||||||
EventType::Direct,
|
|
||||||
)?
|
|
||||||
{
|
{
|
||||||
let mut room_ids_updated = false;
|
let mut room_ids_updated = false;
|
||||||
|
|
||||||
@ -2458,7 +2445,7 @@ impl Rooms {
|
|||||||
// We want to know if the sender is ignored by the receiver
|
// We want to know if the sender is ignored by the receiver
|
||||||
let is_ignored = db
|
let is_ignored = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<ignored_user_list::IgnoredUserListEvent>(
|
.get::<IgnoredUserListEvent>(
|
||||||
None, // Ignored users are in global account data
|
None, // Ignored users are in global account data
|
||||||
user_id, // Receiver
|
user_id, // Receiver
|
||||||
EventType::IgnoredUserList,
|
EventType::IgnoredUserList,
|
||||||
@ -2712,7 +2699,7 @@ impl Rooms {
|
|||||||
);
|
);
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
let mut event = serde_json::from_str::<RoomMemberEventContent>(
|
let mut event: RoomMemberEventContent = serde_json::from_str(
|
||||||
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
|
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
|
||||||
.ok_or(Error::BadRequest(
|
.ok_or(Error::BadRequest(
|
||||||
ErrorKind::BadState,
|
ErrorKind::BadState,
|
||||||
@ -2762,16 +2749,14 @@ impl Rooms {
|
|||||||
"User is not invited.",
|
"User is not invited.",
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
let servers = invite_state
|
let servers: HashSet<_> = invite_state
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|event| {
|
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
|
||||||
serde_json::from_str::<serde_json::Value>(&event.json().to_string()).ok()
|
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
|
||||||
})
|
|
||||||
.filter_map(|event| event.get("sender").cloned())
|
|
||||||
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
|
||||||
.filter_map(|sender| UserId::try_from(sender).ok())
|
.filter_map(|sender| UserId::try_from(sender).ok())
|
||||||
.map(|user| user.server_name().to_owned())
|
.map(|user| user.server_name().to_owned())
|
||||||
.collect::<HashSet<_>>();
|
.collect();
|
||||||
|
|
||||||
for remote_server in servers {
|
for remote_server in servers {
|
||||||
let make_leave_response = db
|
let make_leave_response = db
|
||||||
@ -2920,14 +2905,13 @@ impl Rooms {
|
|||||||
pub fn id_from_alias(&self, alias: &RoomAliasId) -> Result<Option<RoomId>> {
|
pub fn id_from_alias(&self, alias: &RoomAliasId) -> Result<Option<RoomId>> {
|
||||||
self.alias_roomid
|
self.alias_roomid
|
||||||
.get(alias.alias().as_bytes())?
|
.get(alias.alias().as_bytes())?
|
||||||
.map_or(Ok(None), |bytes| {
|
.map(|bytes| {
|
||||||
Ok(Some(
|
|
||||||
RoomId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
RoomId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||||
Error::bad_database("Room ID in alias_roomid is invalid unicode.")
|
Error::bad_database("Room ID in alias_roomid is invalid unicode.")
|
||||||
})?)
|
})?)
|
||||||
.map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid."))?,
|
.map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid."))
|
||||||
))
|
|
||||||
})
|
})
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
@ -2987,11 +2971,11 @@ impl Rooms {
|
|||||||
.to_vec();
|
.to_vec();
|
||||||
let prefix_clone = prefix.clone();
|
let prefix_clone = prefix.clone();
|
||||||
|
|
||||||
let words = search_string
|
let words: Vec<_> = search_string
|
||||||
.split_terminator(|c: char| !c.is_alphanumeric())
|
.split_terminator(|c: char| !c.is_alphanumeric())
|
||||||
.filter(|s| !s.is_empty())
|
.filter(|s| !s.is_empty())
|
||||||
.map(str::to_lowercase)
|
.map(str::to_lowercase)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let iterators = words.clone().into_iter().map(move |word| {
|
let iterators = words.clone().into_iter().map(move |word| {
|
||||||
let mut prefix2 = prefix.clone();
|
let mut prefix2 = prefix.clone();
|
||||||
@ -3004,12 +2988,7 @@ impl Rooms {
|
|||||||
self.tokenids
|
self.tokenids
|
||||||
.iter_from(&last_possible_id, true) // Newest pdus first
|
.iter_from(&last_possible_id, true) // Newest pdus first
|
||||||
.take_while(move |(k, _)| k.starts_with(&prefix2))
|
.take_while(move |(k, _)| k.starts_with(&prefix2))
|
||||||
.map(|(key, _)| {
|
.map(|(key, _)| key[key.len() - size_of::<u64>()..].to_vec())
|
||||||
let pdu_id = key[key.len() - size_of::<u64>()..].to_vec();
|
|
||||||
|
|
||||||
Ok::<_, Error>(pdu_id)
|
|
||||||
})
|
|
||||||
.filter_map(|r| r.ok())
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
@ -3241,11 +3220,11 @@ impl Rooms {
|
|||||||
|
|
||||||
self.roomuserid_leftcount
|
self.roomuserid_leftcount
|
||||||
.get(&key)?
|
.get(&key)?
|
||||||
.map_or(Ok(None), |bytes| {
|
.map(|bytes| {
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
utils::u64_from_bytes(&bytes)
|
||||||
Error::bad_database("Invalid leftcount in db.")
|
.map_err(|_| Error::bad_database("Invalid leftcount in db."))
|
||||||
})?))
|
|
||||||
})
|
})
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator over all rooms this user joined.
|
/// Returns an iterator over all rooms this user joined.
|
||||||
|
@ -162,11 +162,12 @@ impl RoomEdus {
|
|||||||
Ok(self
|
Ok(self
|
||||||
.roomuserid_lastprivatereadupdate
|
.roomuserid_lastprivatereadupdate
|
||||||
.get(&key)?
|
.get(&key)?
|
||||||
.map_or(Ok::<_, Error>(None), |bytes| {
|
.map(|bytes| {
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||||
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
|
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
|
||||||
})?))
|
})
|
||||||
})?
|
})
|
||||||
|
.transpose()?
|
||||||
.unwrap_or(0))
|
.unwrap_or(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,11 +287,12 @@ impl RoomEdus {
|
|||||||
Ok(self
|
Ok(self
|
||||||
.roomid_lasttypingupdate
|
.roomid_lasttypingupdate
|
||||||
.get(room_id.as_bytes())?
|
.get(room_id.as_bytes())?
|
||||||
.map_or(Ok::<_, Error>(None), |bytes| {
|
.map(|bytes| {
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||||
Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.")
|
Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.")
|
||||||
})?))
|
})
|
||||||
})?
|
})
|
||||||
|
.transpose()?
|
||||||
.unwrap_or(0))
|
.unwrap_or(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,7 +401,7 @@ impl RoomEdus {
|
|||||||
self.presenceid_presence
|
self.presenceid_presence
|
||||||
.get(&presence_id)?
|
.get(&presence_id)?
|
||||||
.map(|value| {
|
.map(|value| {
|
||||||
let mut presence = serde_json::from_slice::<PresenceEvent>(&value)
|
let mut presence: PresenceEvent = serde_json::from_slice(&value)
|
||||||
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
|
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
|
||||||
let current_timestamp: UInt = utils::millis_since_unix_epoch()
|
let current_timestamp: UInt = utils::millis_since_unix_epoch()
|
||||||
.try_into()
|
.try_into()
|
||||||
@ -521,7 +523,7 @@ impl RoomEdus {
|
|||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
|
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
|
||||||
|
|
||||||
let mut presence = serde_json::from_slice::<PresenceEvent>(&value)
|
let mut presence: PresenceEvent = serde_json::from_slice(&value)
|
||||||
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
|
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
|
||||||
|
|
||||||
let current_timestamp: UInt = utils::millis_since_unix_epoch()
|
let current_timestamp: UInt = utils::millis_since_unix_epoch()
|
||||||
|
@ -27,7 +27,7 @@ use ruma::{
|
|||||||
OutgoingRequest,
|
OutgoingRequest,
|
||||||
},
|
},
|
||||||
device_id,
|
device_id,
|
||||||
events::{push_rules, AnySyncEphemeralRoomEvent, EventType},
|
events::{push_rules::PushRulesEvent, AnySyncEphemeralRoomEvent, EventType},
|
||||||
push,
|
push,
|
||||||
receipt::ReceiptType,
|
receipt::ReceiptType,
|
||||||
uint, MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId,
|
uint, MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId,
|
||||||
@ -165,13 +165,13 @@ impl Sending {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Find events that have been added since starting the last request
|
// Find events that have been added since starting the last request
|
||||||
let new_events = guard.sending.servernameevent_data
|
let new_events: Vec<_> = guard.sending.servernameevent_data
|
||||||
.scan_prefix(prefix.clone())
|
.scan_prefix(prefix.clone())
|
||||||
.filter_map(|(k, v)| {
|
.filter_map(|(k, v)| {
|
||||||
Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k))
|
Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k))
|
||||||
})
|
})
|
||||||
.take(30)
|
.take(30)
|
||||||
.collect::<Vec<_>>();
|
.collect::<>();
|
||||||
|
|
||||||
// TODO: find edus
|
// TODO: find edus
|
||||||
|
|
||||||
@ -344,8 +344,8 @@ impl Sending {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let event =
|
let event: AnySyncEphemeralRoomEvent =
|
||||||
serde_json::from_str::<AnySyncEphemeralRoomEvent>(read_receipt.json().get())
|
serde_json::from_str(read_receipt.json().get())
|
||||||
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
|
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
|
||||||
let federation_event = match event {
|
let federation_event = match event {
|
||||||
AnySyncEphemeralRoomEvent::Receipt(r) => {
|
AnySyncEphemeralRoomEvent::Receipt(r) => {
|
||||||
@ -612,9 +612,9 @@ impl Sending {
|
|||||||
|
|
||||||
let rules_for_user = db
|
let rules_for_user = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, &userid, EventType::PushRules)
|
.get(None, &userid, EventType::PushRules)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.map(|ev| ev.content.global)
|
.map(|ev: PushRulesEvent| ev.content.global)
|
||||||
.unwrap_or_else(|| push::Ruleset::server_default(&userid));
|
.unwrap_or_else(|| push::Ruleset::server_default(&userid));
|
||||||
|
|
||||||
let unread: UInt = db
|
let unread: UInt = db
|
||||||
|
@ -175,16 +175,14 @@ impl Uiaa {
|
|||||||
|
|
||||||
self.userdevicesessionid_uiaarequest
|
self.userdevicesessionid_uiaarequest
|
||||||
.get(&userdevicesessionid)?
|
.get(&userdevicesessionid)?
|
||||||
.map_or(Ok(None), |bytes| {
|
.map(|bytes| {
|
||||||
Ok::<_, Error>(Some(
|
|
||||||
serde_json::from_str::<CanonicalJsonValue>(
|
serde_json::from_str::<CanonicalJsonValue>(
|
||||||
&utils::string_from_bytes(&bytes).map_err(|_| {
|
&utils::string_from_bytes(&bytes)
|
||||||
Error::bad_database("Invalid uiaa request bytes in db.")
|
.map_err(|_| Error::bad_database("Invalid uiaa request bytes in db."))?,
|
||||||
})?,
|
|
||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("Invalid uiaa request in db."))?,
|
.map_err(|_| Error::bad_database("Invalid uiaa request in db."))
|
||||||
))
|
|
||||||
})
|
})
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_uiaa_session(
|
fn update_uiaa_session(
|
||||||
@ -225,7 +223,7 @@ impl Uiaa {
|
|||||||
userdevicesessionid.push(0xff);
|
userdevicesessionid.push(0xff);
|
||||||
userdevicesessionid.extend_from_slice(session.as_bytes());
|
userdevicesessionid.extend_from_slice(session.as_bytes());
|
||||||
|
|
||||||
let uiaainfo = serde_json::from_slice::<UiaaInfo>(
|
serde_json::from_slice(
|
||||||
&self
|
&self
|
||||||
.userdevicesessionid_uiaainfo
|
.userdevicesessionid_uiaainfo
|
||||||
.get(&userdevicesessionid)?
|
.get(&userdevicesessionid)?
|
||||||
@ -234,8 +232,6 @@ impl Uiaa {
|
|||||||
"UIAA session does not exist.",
|
"UIAA session does not exist.",
|
||||||
))?,
|
))?,
|
||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("UiaaInfo in userdeviceid_uiaainfo is invalid."))?;
|
.map_err(|_| Error::bad_database("UiaaInfo in userdeviceid_uiaainfo is invalid."))
|
||||||
|
|
||||||
Ok(uiaainfo)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -603,10 +603,11 @@ impl Users {
|
|||||||
key.push(0xff);
|
key.push(0xff);
|
||||||
key.extend_from_slice(key_id.as_bytes());
|
key.extend_from_slice(key_id.as_bytes());
|
||||||
|
|
||||||
let mut cross_signing_key =
|
let mut cross_signing_key: serde_json::Value =
|
||||||
serde_json::from_slice::<serde_json::Value>(&self.keyid_key.get(&key)?.ok_or(
|
serde_json::from_slice(&self.keyid_key.get(&key)?.ok_or(Error::BadRequest(
|
||||||
Error::BadRequest(ErrorKind::InvalidParam, "Tried to sign nonexistent key."),
|
ErrorKind::InvalidParam,
|
||||||
)?)
|
"Tried to sign nonexistent key.",
|
||||||
|
))?)
|
||||||
.map_err(|_| Error::bad_database("key in keyid_key is invalid."))?;
|
.map_err(|_| Error::bad_database("key in keyid_key is invalid."))?;
|
||||||
|
|
||||||
let signatures = cross_signing_key
|
let signatures = cross_signing_key
|
||||||
|
@ -69,8 +69,8 @@ impl PduEvent {
|
|||||||
_ => &[],
|
_ => &[],
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut old_content =
|
let mut old_content: BTreeMap<String, serde_json::Value> =
|
||||||
serde_json::from_str::<BTreeMap<String, serde_json::Value>>(self.content.get())
|
serde_json::from_str(self.content.get())
|
||||||
.map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
|
.map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
|
||||||
|
|
||||||
let mut new_content = serde_json::Map::new();
|
let mut new_content = serde_json::Map::new();
|
||||||
@ -92,8 +92,8 @@ impl PduEvent {
|
|||||||
|
|
||||||
pub fn remove_transaction_id(&mut self) -> crate::Result<()> {
|
pub fn remove_transaction_id(&mut self) -> crate::Result<()> {
|
||||||
if let Some(unsigned) = &self.unsigned {
|
if let Some(unsigned) = &self.unsigned {
|
||||||
let mut unsigned =
|
let mut unsigned: BTreeMap<String, Box<RawJsonValue>> =
|
||||||
serde_json::from_str::<BTreeMap<String, Box<RawJsonValue>>>(unsigned.get())
|
serde_json::from_str(unsigned.get())
|
||||||
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
|
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
|
||||||
unsigned.remove("transaction_id");
|
unsigned.remove("transaction_id");
|
||||||
self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
|
self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
|
||||||
|
@ -49,6 +49,7 @@ use ruma::{
|
|||||||
},
|
},
|
||||||
int,
|
int,
|
||||||
receipt::ReceiptType,
|
receipt::ReceiptType,
|
||||||
|
serde::JsonObject,
|
||||||
signatures::{CanonicalJsonObject, CanonicalJsonValue},
|
signatures::{CanonicalJsonObject, CanonicalJsonValue},
|
||||||
state_res::{self, RoomVersion, StateMap},
|
state_res::{self, RoomVersion, StateMap},
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
@ -1003,12 +1004,12 @@ pub(crate) async fn handle_incoming_pdu<'a>(
|
|||||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
||||||
let mut graph = HashMap::new();
|
let mut graph = HashMap::new();
|
||||||
let mut eventid_info = HashMap::new();
|
let mut eventid_info = HashMap::new();
|
||||||
let mut todo_outlier_stack = incoming_pdu
|
let mut todo_outlier_stack: Vec<_> = incoming_pdu
|
||||||
.prev_events
|
.prev_events
|
||||||
.iter()
|
.iter()
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(Arc::new)
|
.map(Arc::new)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let mut amount = 0;
|
let mut amount = 0;
|
||||||
|
|
||||||
@ -1150,10 +1151,8 @@ fn handle_outlier_pdu<'a>(
|
|||||||
// 2. Check signatures, otherwise drop
|
// 2. Check signatures, otherwise drop
|
||||||
// 3. check content hash, redact if doesn't match
|
// 3. check content hash, redact if doesn't match
|
||||||
|
|
||||||
let create_event_content = serde_json::from_str::<RoomCreateEventContent>(
|
let create_event_content: RoomCreateEventContent =
|
||||||
create_event.content.get(),
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
warn!("Invalid create event: {}", e);
|
||||||
"Invalid create event in db.".to_owned()
|
"Invalid create event in db.".to_owned()
|
||||||
})?;
|
})?;
|
||||||
@ -1315,10 +1314,8 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
return Err("Event has been soft failed".into());
|
return Err("Event has been soft failed".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let create_event_content = serde_json::from_str::<RoomCreateEventContent>(
|
let create_event_content: RoomCreateEventContent =
|
||||||
create_event.content.get(),
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
warn!("Invalid create event: {}", e);
|
||||||
"Invalid create event in db.".to_owned()
|
"Invalid create event in db.".to_owned()
|
||||||
})?;
|
})?;
|
||||||
@ -1633,7 +1630,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
.compress_state_event(*shortstatekey, id, &db.globals)
|
.compress_state_event(*shortstatekey, id, &db.globals)
|
||||||
.map_err(|_| "Failed to compress_state_event".to_owned())
|
.map_err(|_| "Failed to compress_state_event".to_owned())
|
||||||
})
|
})
|
||||||
.collect::<StdResult<_, String>>()?;
|
.collect::<StdResult<_, _>>()?;
|
||||||
|
|
||||||
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
||||||
debug!("starting soft fail auth check");
|
debug!("starting soft fail auth check");
|
||||||
@ -1753,7 +1750,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
.compress_state_event(*k, id, &db.globals)
|
.compress_state_event(*k, id, &db.globals)
|
||||||
.map_err(|_| "Failed to compress_state_event.".to_owned())
|
.map_err(|_| "Failed to compress_state_event.".to_owned())
|
||||||
})
|
})
|
||||||
.collect::<StdResult<_, String>>()?
|
.collect::<StdResult<_, _>>()?
|
||||||
} else {
|
} else {
|
||||||
// We do need to force an update to this room's state
|
// We do need to force an update to this room's state
|
||||||
update_state = true;
|
update_state = true;
|
||||||
@ -1772,7 +1769,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let fork_states = &fork_states
|
let fork_states: Vec<_> = fork_states
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|map| {
|
.map(|map| {
|
||||||
map.into_iter()
|
map.into_iter()
|
||||||
@ -1783,12 +1780,12 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
})
|
})
|
||||||
.collect::<Result<StateMap<_>>>()
|
.collect::<Result<StateMap<_>>>()
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>>>()
|
.collect::<Result<_>>()
|
||||||
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
|
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
|
||||||
|
|
||||||
let state = match state_res::resolve(
|
let state = match state_res::resolve(
|
||||||
room_version_id,
|
room_version_id,
|
||||||
fork_states,
|
&fork_states,
|
||||||
auth_chain_sets,
|
auth_chain_sets,
|
||||||
|id| {
|
|id| {
|
||||||
let res = db.rooms.get_pdu(id);
|
let res = db.rooms.get_pdu(id);
|
||||||
@ -1815,7 +1812,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||||||
.compress_state_event(shortstatekey, &event_id, &db.globals)
|
.compress_state_event(shortstatekey, &event_id, &db.globals)
|
||||||
.map_err(|_| "Failed to compress state event".to_owned())
|
.map_err(|_| "Failed to compress state event".to_owned())
|
||||||
})
|
})
|
||||||
.collect::<StdResult<_, String>>()?
|
.collect::<StdResult<_, _>>()?
|
||||||
};
|
};
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
// Set the new room state to the resolved state
|
||||||
@ -2035,12 +2032,12 @@ pub(crate) async fn fetch_signing_keys(
|
|||||||
|
|
||||||
trace!("Loading signing keys for {}", origin);
|
trace!("Loading signing keys for {}", origin);
|
||||||
|
|
||||||
let mut result = db
|
let mut result: BTreeMap<_, _> = db
|
||||||
.globals
|
.globals
|
||||||
.signing_keys_for(origin)?
|
.signing_keys_for(origin)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key))
|
.map(|(k, v)| (k.to_string(), v.key))
|
||||||
.collect::<BTreeMap<_, _>>();
|
.collect();
|
||||||
|
|
||||||
if contains_all_ids(&result) {
|
if contains_all_ids(&result) {
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
@ -2245,11 +2242,7 @@ pub(crate) fn get_auth_chain<'a>(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_key = chunk
|
let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
|
||||||
.iter()
|
|
||||||
.map(|(short, _)| short)
|
|
||||||
.copied()
|
|
||||||
.collect::<Vec<u64>>();
|
|
||||||
if let Some(cached) = db.rooms.get_auth_chain_from_cache(&chunk_key)? {
|
if let Some(cached) = db.rooms.get_auth_chain_from_cache(&chunk_key)? {
|
||||||
hits += 1;
|
hits += 1;
|
||||||
full_auth_chain.extend(cached.iter().copied());
|
full_auth_chain.extend(cached.iter().copied());
|
||||||
@ -2564,9 +2557,9 @@ pub fn get_room_state_route(
|
|||||||
Ok(get_room_state::v1::Response {
|
Ok(get_room_state::v1::Response {
|
||||||
auth_chain: auth_chain_ids
|
auth_chain: auth_chain_ids
|
||||||
.map(|id| {
|
.map(|id| {
|
||||||
Ok::<_, Error>(PduEvent::convert_to_outgoing_federation_event(
|
db.rooms.get_pdu_json(&id).map(|maybe_json| {
|
||||||
db.rooms.get_pdu_json(&id)?.unwrap(),
|
PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap())
|
||||||
))
|
})
|
||||||
})
|
})
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(|r| r.ok())
|
||||||
.collect(),
|
.collect(),
|
||||||
@ -2650,26 +2643,24 @@ pub fn create_join_event_template_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let prev_events = db
|
let prev_events: Vec<_> = db
|
||||||
.rooms
|
.rooms
|
||||||
.get_pdu_leaves(&body.room_id)?
|
.get_pdu_leaves(&body.room_id)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.take(20)
|
.take(20)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
let create_event = db
|
let create_event = db
|
||||||
.rooms
|
.rooms
|
||||||
.room_state_get(&body.room_id, &EventType::RoomCreate, "")?;
|
.room_state_get(&body.room_id, &EventType::RoomCreate, "")?;
|
||||||
|
|
||||||
let create_event_content = create_event
|
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|create_event| {
|
.map(|create_event| {
|
||||||
serde_json::from_str::<RoomCreateEventContent>(create_event.content.get()).map_err(
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
warn!("Invalid create event: {}", e);
|
||||||
Error::bad_database("Invalid create event in db.")
|
Error::bad_database("Invalid create event in db.")
|
||||||
},
|
})
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
@ -2835,7 +2826,7 @@ async fn create_join_event(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let origin = serde_json::from_value::<Box<ServerName>>(
|
let origin: Box<ServerName> = serde_json::from_value(
|
||||||
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
|
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
"Event needs an origin field.",
|
"Event needs an origin field.",
|
||||||
@ -3009,15 +3000,12 @@ pub async fn create_invite_route(
|
|||||||
|
|
||||||
let mut invite_state = body.invite_room_state.clone();
|
let mut invite_state = body.invite_room_state.clone();
|
||||||
|
|
||||||
let mut event =
|
let mut event: JsonObject = serde_json::from_str(body.event.get())
|
||||||
serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(body.event.get())
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes."))?;
|
||||||
.map_err(|_| {
|
|
||||||
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
event.insert("event_id".to_owned(), "$dummy".into());
|
event.insert("event_id".to_owned(), "$dummy".into());
|
||||||
|
|
||||||
let pdu = serde_json::from_value::<PduEvent>(event.into()).map_err(|e| {
|
let pdu: PduEvent = serde_json::from_value(event.into()).map_err(|e| {
|
||||||
warn!("Invalid invite event: {}", e);
|
warn!("Invalid invite event: {}", e);
|
||||||
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
||||||
})?;
|
})?;
|
||||||
@ -3282,7 +3270,7 @@ fn get_server_keys_from_cache(
|
|||||||
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
|
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
||||||
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
|
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
|
||||||
Error::BadServerResponse("Invalid PDU in server response")
|
Error::BadServerResponse("Invalid PDU in server response")
|
||||||
})?;
|
})?;
|
||||||
@ -3343,19 +3331,16 @@ fn get_server_keys_from_cache(
|
|||||||
|
|
||||||
trace!("Loading signing keys for {}", origin);
|
trace!("Loading signing keys for {}", origin);
|
||||||
|
|
||||||
let result = db
|
let result: BTreeMap<_, _> = db
|
||||||
.globals
|
.globals
|
||||||
.signing_keys_for(origin)?
|
.signing_keys_for(origin)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key))
|
.map(|(k, v)| (k.to_string(), v.key))
|
||||||
.collect::<BTreeMap<_, _>>();
|
.collect();
|
||||||
|
|
||||||
if !contains_all_ids(&result) {
|
if !contains_all_ids(&result) {
|
||||||
trace!("Signing key not loaded for {}", origin);
|
trace!("Signing key not loaded for {}", origin);
|
||||||
servers.insert(
|
servers.insert(origin.clone(), BTreeMap::new());
|
||||||
origin.clone(),
|
|
||||||
BTreeMap::<ServerSigningKeyId, QueryCriteria>::new(),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub_key_map.insert(origin.to_string(), result);
|
pub_key_map.insert(origin.to_string(), result);
|
||||||
@ -3370,8 +3355,8 @@ pub(crate) async fn fetch_join_signing_keys(
|
|||||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut servers =
|
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>> =
|
||||||
BTreeMap::<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>::new();
|
BTreeMap::new();
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pkm = pub_key_map
|
let mut pkm = pub_key_map
|
||||||
@ -3436,7 +3421,7 @@ pub(crate) async fn fetch_join_signing_keys(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut futures = servers
|
let mut futures: FuturesUnordered<_> = servers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, _)| async move {
|
.map(|(server, _)| async move {
|
||||||
(
|
(
|
||||||
@ -3450,16 +3435,16 @@ pub(crate) async fn fetch_join_signing_keys(
|
|||||||
server,
|
server,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect();
|
||||||
|
|
||||||
while let Some(result) = futures.next().await {
|
while let Some(result) = futures.next().await {
|
||||||
if let (Ok(get_keys_response), origin) = result {
|
if let (Ok(get_keys_response), origin) = result {
|
||||||
let result = db
|
let result: BTreeMap<_, _> = db
|
||||||
.globals
|
.globals
|
||||||
.add_signing_key(&origin, get_keys_response.server_key.clone())?
|
.add_signing_key(&origin, get_keys_response.server_key.clone())?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key))
|
.map(|(k, v)| (k.to_string(), v.key))
|
||||||
.collect::<BTreeMap<_, _>>();
|
.collect();
|
||||||
|
|
||||||
pub_key_map
|
pub_key_map
|
||||||
.write()
|
.write()
|
||||||
|
Loading…
Reference in New Issue
Block a user