feat: presence updates

This commit is contained in:
timokoesters 2020-05-09 21:47:09 +02:00
parent 551308e9a8
commit ee0d6940bd
No known key found for this signature in database
GPG Key ID: 356E705610F626D5
5 changed files with 293 additions and 193 deletions

View File

@ -12,7 +12,6 @@ use ruma_client_api::{
account::{get_username_availability, register},
alias::get_alias,
capabilities::get_capabilities,
to_device::send_event_to_device,
config::{get_global_account_data, set_global_account_data},
directory::{self, get_public_rooms_filtered},
filter::{self, create_filter, get_filter},
@ -34,13 +33,14 @@ use ruma_client_api::{
state::{create_state_event_for_empty_key, create_state_event_for_key},
sync::sync_events,
thirdparty::get_protocols,
to_device::send_event_to_device,
typing::create_typing_event,
uiaa::{AuthFlow, UiaaInfo, UiaaResponse},
user_directory::search_users,
},
unversioned::get_supported_versions,
};
use ruma_events::{collections::only::Event as EduEvent, EventType};
use ruma_events::{collections::only::Event as EduEvent, EventJson, EventType};
use ruma_identifiers::{RoomId, UserId};
use serde_json::{json, value::RawValue};
@ -165,45 +165,45 @@ pub fn register_route(
let token = utils::random_string(TOKEN_LENGTH);
// Add device
db
.users
db.users
.create_device(&user_id, &device_id, &token)
.unwrap();
// Initial data
db.account_data.update(
None,
&user_id,
EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent {
content: ruma_events::push_rules::PushRulesEventContent {
global: ruma_events::push_rules::Ruleset {
content: vec![],
override_rules: vec![],
room: vec![],
sender: vec![],
underride: vec![ruma_events::push_rules::ConditionalPushRule {
actions: vec![
ruma_events::push_rules::Action::Notify,
ruma_events::push_rules::Action::SetTweak(
ruma_common::push::Tweak::Highlight(false),
),
],
default: true,
enabled: true,
rule_id: ".m.rule.message".to_owned(),
conditions: vec![ruma_events::push_rules::PushCondition::EventMatch(
ruma_events::push_rules::EventMatchCondition {
key: "type".to_owned(),
pattern: "m.room.message".to_owned(),
},
)],
}],
db.account_data
.update(
None,
&user_id,
EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent {
content: ruma_events::push_rules::PushRulesEventContent {
global: ruma_events::push_rules::Ruleset {
content: vec![],
override_rules: vec![],
room: vec![],
sender: vec![],
underride: vec![ruma_events::push_rules::ConditionalPushRule {
actions: vec![
ruma_events::push_rules::Action::Notify,
ruma_events::push_rules::Action::SetTweak(
ruma_common::push::Tweak::Highlight(false),
),
],
default: true,
enabled: true,
rule_id: ".m.rule.message".to_owned(),
conditions: vec![ruma_events::push_rules::PushCondition::EventMatch(
ruma_events::push_rules::EventMatchCondition {
key: "type".to_owned(),
pattern: "m.room.message".to_owned(),
},
)],
}],
},
},
},
}),
&db.globals,
)
.unwrap();
}),
&db.globals,
)
.unwrap();
MatrixResult(Ok(register::Response {
access_token: Some(token),
@ -220,7 +220,10 @@ pub fn get_login_route() -> MatrixResult<get_login_types::Response> {
}
#[post("/_matrix/client/r0/login", data = "<body>")]
pub fn login_route(db: State<'_, Database>, body: Ruma<login::Request>) -> MatrixResult<login::Response> {
pub fn login_route(
db: State<'_, Database>,
body: Ruma<login::Request>,
) -> MatrixResult<login::Response> {
// Validate login method
let user_id =
if let (login::UserInfo::MatrixId(mut username), login::LoginInfo::Password { password }) =
@ -280,8 +283,7 @@ pub fn login_route(db: State<'_, Database>, body: Ruma<login::Request>) -> Matri
let token = utils::random_string(TOKEN_LENGTH);
// Add device
db
.users
db.users
.create_device(&user_id, &device_id, &token)
.unwrap();
@ -318,7 +320,7 @@ pub fn get_pushrules_all_route() -> MatrixResult<get_pushrules_all::Response> {
vec![push::PushRule {
actions: vec![
push::Action::Notify,
push::Action::SetTweak(ruma_common::push::Tweak::Highlight(false))
push::Action::SetTweak(ruma_common::push::Tweak::Highlight(false)),
],
default: true,
enabled: true,
@ -346,39 +348,40 @@ pub fn set_pushrule_route(
) -> MatrixResult<set_pushrule::Response> {
// TODO
let user_id = body.user_id.clone().expect("user is authenticated");
db.account_data.update(
None,
&user_id,
EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent {
content: ruma_events::push_rules::PushRulesEventContent {
global: ruma_events::push_rules::Ruleset {
content: vec![],
override_rules: vec![],
room: vec![],
sender: vec![],
underride: vec![ruma_events::push_rules::ConditionalPushRule {
actions: vec![
ruma_events::push_rules::Action::Notify,
ruma_events::push_rules::Action::SetTweak(
ruma_common::push::Tweak::Highlight(false),
),
],
default: true,
enabled: true,
rule_id: ".m.rule.message".to_owned(),
conditions: vec![ruma_events::push_rules::PushCondition::EventMatch(
ruma_events::push_rules::EventMatchCondition {
key: "type".to_owned(),
pattern: "m.room.message".to_owned(),
},
)],
}],
db.account_data
.update(
None,
&user_id,
EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent {
content: ruma_events::push_rules::PushRulesEventContent {
global: ruma_events::push_rules::Ruleset {
content: vec![],
override_rules: vec![],
room: vec![],
sender: vec![],
underride: vec![ruma_events::push_rules::ConditionalPushRule {
actions: vec![
ruma_events::push_rules::Action::Notify,
ruma_events::push_rules::Action::SetTweak(
ruma_common::push::Tweak::Highlight(false),
),
],
default: true,
enabled: true,
rule_id: ".m.rule.message".to_owned(),
conditions: vec![ruma_events::push_rules::PushCondition::EventMatch(
ruma_events::push_rules::EventMatchCondition {
key: "type".to_owned(),
pattern: "m.room.message".to_owned(),
},
)],
}],
},
},
},
}),
&db.globals
)
.unwrap();
}),
&db.globals,
)
.unwrap();
MatrixResult(Ok(set_pushrule::Response))
}
@ -393,9 +396,7 @@ pub fn set_pushrule_enabled_route(
MatrixResult(Ok(set_pushrule_enabled::Response))
}
#[get(
"/_matrix/client/r0/user/<_user_id>/filter/<_filter_id>",
)]
#[get("/_matrix/client/r0/user/<_user_id>/filter/<_filter_id>")]
pub fn get_filter_route(
_user_id: String,
_filter_id: String,
@ -413,18 +414,14 @@ pub fn get_filter_route(
}
#[post("/_matrix/client/r0/user/<_user_id>/filter")]
pub fn create_filter_route(
_user_id: String,
) -> MatrixResult<create_filter::Response> {
pub fn create_filter_route(_user_id: String) -> MatrixResult<create_filter::Response> {
// TODO
MatrixResult(Ok(create_filter::Response {
filter_id: utils::random_string(10),
}))
}
#[put(
"/_matrix/client/r0/user/<_user_id>/account_data/<_type>",
)]
#[put("/_matrix/client/r0/user/<_user_id>/account_data/<_type>")]
pub fn set_global_account_data_route(
_user_id: String,
_type: String,
@ -432,9 +429,7 @@ pub fn set_global_account_data_route(
MatrixResult(Ok(set_global_account_data::Response))
}
#[get(
"/_matrix/client/r0/user/<_user_id>/account_data/<_type>",
)]
#[get("/_matrix/client/r0/user/<_user_id>/account_data/<_type>")]
pub fn get_global_account_data_route(
_user_id: String,
_type: String,
@ -460,25 +455,44 @@ pub fn set_displayname_route(
if displayname == "" {
db.users.set_displayname(&user_id, None).unwrap();
} else {
db
.users
db.users
.set_displayname(&user_id, Some(displayname.clone()))
.unwrap();
}
// Send a new membership event into all joined rooms
for room_id in db.rooms.rooms_joined(&user_id) {
db.rooms.append_pdu(
room_id.unwrap(),
user_id.clone(),
EventType::RoomMember,
json!({"membership": "join", "displayname": displayname}),
None,
Some(user_id.to_string()),
&db.globals
).unwrap();
db.rooms
.append_pdu(
room_id.unwrap(),
user_id.clone(),
EventType::RoomMember,
json!({"membership": "join", "displayname": displayname}),
None,
Some(user_id.to_string()),
&db.globals,
)
.unwrap();
}
// TODO: send a new m.presence event
// Presence update
db.global_edus
.update_globallatest(
&user_id,
EduEvent::Presence(ruma_events::presence::PresenceEvent {
content: ruma_events::presence::PresenceEventContent {
avatar_url: db.users.avatar_url(&user_id).unwrap(),
currently_active: None,
displayname: db.users.displayname(&user_id).unwrap(),
last_active_ago: Some(utils::millis_since_unix_epoch().try_into().unwrap()),
presence: ruma_events::presence::PresenceState::Online,
status_msg: None,
},
sender: user_id.clone(),
}),
&db.globals,
)
.unwrap();
} else {
// Send error on None
// Synapse returns a parsing error but the spec doesn't require this
@ -542,8 +556,7 @@ pub fn set_avatar_url_route(
if body.avatar_url == "" {
db.users.set_avatar_url(&user_id, None).unwrap();
} else {
db
.users
db.users
.set_avatar_url(&user_id, Some(body.avatar_url.clone()))
.unwrap();
// TODO send a new m.room.member join event with the updated avatar_url
@ -605,11 +618,32 @@ pub fn get_profile_route(
}))
}
#[put("/_matrix/client/r0/presence/<_user_id>/status")]
#[put("/_matrix/client/r0/presence/<_user_id>/status", data = "<body>")]
pub fn set_presence_route(
db: State<'_, Database>,
body: Ruma<set_presence::Request>,
_user_id: String,
) -> MatrixResult<set_presence::Response> {
// TODO
let user_id = body.user_id.clone().expect("user is authenticated");
db.global_edus
.update_globallatest(
&user_id,
EduEvent::Presence(ruma_events::presence::PresenceEvent {
content: ruma_events::presence::PresenceEventContent {
avatar_url: db.users.avatar_url(&user_id).unwrap(),
currently_active: None,
displayname: db.users.displayname(&user_id).unwrap(),
last_active_ago: Some(utils::millis_since_unix_epoch().try_into().unwrap()),
presence: body.presence,
status_msg: body.status_msg.clone(),
},
sender: user_id.clone(),
}),
&db.globals,
)
.unwrap();
MatrixResult(Ok(set_presence::Response))
}
@ -637,28 +671,27 @@ pub fn set_read_marker_route(
_room_id: String,
) -> MatrixResult<set_read_marker::Response> {
let user_id = body.user_id.clone().expect("user is authenticated");
db.account_data.update(
Some(&body.room_id),
&user_id,
EduEvent::FullyRead(ruma_events::fully_read::FullyReadEvent {
content: ruma_events::fully_read::FullyReadEventContent {
event_id: body.fully_read.clone(),
},
room_id: Some(body.room_id.clone()),
}),
&db.globals
)
.unwrap();
db.account_data
.update(
Some(&body.room_id),
&user_id,
EduEvent::FullyRead(ruma_events::fully_read::FullyReadEvent {
content: ruma_events::fully_read::FullyReadEventContent {
event_id: body.fully_read.clone(),
},
room_id: Some(body.room_id.clone()),
}),
&db.globals,
)
.unwrap();
if let Some(event) = &body.read_receipt {
db
.rooms
db.rooms
.edus
.room_read_set(
&body.room_id,
&user_id,
db
.rooms
db.rooms
.get_pdu_count(event)
.unwrap()
.expect("TODO: what if a client specifies an invalid event"),
@ -680,8 +713,7 @@ pub fn set_read_marker_route(
},
);
db
.rooms
db.rooms
.edus
.roomlatest_update(
&user_id,
@ -716,8 +748,7 @@ pub fn create_typing_event_route(
});
if body.typing {
db
.rooms
db.rooms
.edus
.roomactive_add(
edu,
@ -728,11 +759,7 @@ pub fn create_typing_event_route(
)
.unwrap();
} else {
db
.rooms
.edus
.roomactive_remove(edu, &body.room_id)
.unwrap();
db.rooms.edus.roomactive_remove(edu, &body.room_id).unwrap();
}
MatrixResult(Ok(create_typing_event::Response))
@ -747,8 +774,7 @@ pub fn create_room_route(
let room_id = RoomId::try_from(db.globals.hostname()).expect("host is valid");
let user_id = body.user_id.clone().expect("user is authenticated");
db
.rooms
db.rooms
.append_pdu(
room_id.clone(),
user_id.clone(),
@ -760,8 +786,7 @@ pub fn create_room_route(
)
.unwrap();
db
.rooms
db.rooms
.join(
&room_id,
&user_id,
@ -770,8 +795,7 @@ pub fn create_room_route(
)
.unwrap();
db
.rooms
db.rooms
.append_pdu(
room_id.clone(),
user_id.clone(),
@ -793,8 +817,7 @@ pub fn create_room_route(
.unwrap();
if let Some(name) = &body.name {
db
.rooms
db.rooms
.append_pdu(
room_id.clone(),
user_id.clone(),
@ -808,8 +831,7 @@ pub fn create_room_route(
}
if let Some(topic) = &body.topic {
db
.rooms
db.rooms
.append_pdu(
room_id.clone(),
user_id.clone(),
@ -823,8 +845,7 @@ pub fn create_room_route(
}
for user in &body.invite {
db
.rooms
db.rooms
.invite(&user_id, &room_id, user, &db.globals)
.unwrap();
}
@ -945,8 +966,7 @@ pub fn leave_room_route(
_room_id: String,
) -> MatrixResult<leave_room::Response> {
let user_id = body.user_id.clone().expect("user is authenticated");
db
.rooms
db.rooms
.leave(&user_id, &body.room_id, &user_id, &db.globals)
.unwrap();
MatrixResult(Ok(leave_room::Response))
@ -970,8 +990,7 @@ pub fn invite_user_route(
_room_id: String,
) -> MatrixResult<invite_user::Response> {
if let invite_user::InvitationRecipient::UserId { user_id } = &body.recipient {
db
.rooms
db.rooms
.invite(
&body.user_id.as_ref().expect("user is authenticated"),
&body.room_id,
@ -1069,16 +1088,13 @@ pub fn search_users_route(
}
#[get("/_matrix/client/r0/rooms/<_room_id>/members")]
pub fn get_member_events_route(
_room_id: String,
) -> MatrixResult<get_member_events::Response> {
pub fn get_member_events_route(_room_id: String) -> MatrixResult<get_member_events::Response> {
// TODO
MatrixResult(Ok(get_member_events::Response { chunk: Vec::new() }))
}
#[get("/_matrix/client/r0/thirdparty/protocols")]
pub fn get_protocols_route(
) -> MatrixResult<get_protocols::Response> {
pub fn get_protocols_route() -> MatrixResult<get_protocols::Response> {
// TODO
MatrixResult(Ok(get_protocols::Response {
protocols: BTreeMap::new(),
@ -1114,7 +1130,9 @@ pub fn create_message_event_route(
)
.expect("message events are always okay");
MatrixResult(Ok(create_message_event::Response { event_id: Some(event_id) }))
MatrixResult(Ok(create_message_event::Response {
event_id: Some(event_id),
}))
}
#[put(
@ -1144,7 +1162,9 @@ pub fn create_state_event_for_key_route(
)
.unwrap();
MatrixResult(Ok(create_state_event_for_key::Response { event_id: Some(event_id) }))
MatrixResult(Ok(create_state_event_for_key::Response {
event_id: Some(event_id),
}))
}
#[put(
@ -1173,7 +1193,9 @@ pub fn create_state_event_for_empty_key_route(
)
.unwrap();
MatrixResult(Ok(create_state_event_for_empty_key::Response { event_id: Some(event_id) }))
MatrixResult(Ok(create_state_event_for_empty_key::Response {
event_id: Some(event_id),
}))
}
#[get("/_matrix/client/r0/sync", data = "<body>")]
@ -1197,7 +1219,8 @@ pub fn sync_route(
let mut pdus = db
.rooms
.pdus_since(&room_id, since).unwrap()
.pdus_since(&room_id, since)
.unwrap()
.map(|r| r.unwrap())
.collect::<Vec<_>>();
@ -1213,16 +1236,12 @@ pub fn sync_route(
}
}
let notification_count = if let Some(last_read) = db
.rooms
.edus
.room_read_get(&room_id, &user_id)
.unwrap()
{
Some((db.rooms.pdus_since(&room_id, last_read).unwrap().count() as u32).into())
} else {
None
};
let notification_count =
if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &user_id).unwrap() {
Some((db.rooms.pdus_since(&room_id, last_read).unwrap().count() as u32).into())
} else {
None
};
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are enough events
@ -1262,10 +1281,10 @@ pub fn sync_route(
}
edus.extend(
db
.rooms
db.rooms
.edus
.roomlatests_since(&room_id, since).unwrap()
.roomlatests_since(&room_id, since)
.unwrap()
.map(|r| r.unwrap()),
);
@ -1273,8 +1292,10 @@ pub fn sync_route(
room_id.clone().try_into().unwrap(),
sync_events::JoinedRoom {
account_data: Some(sync_events::AccountData {
events: db.account_data
.changes_since(Some(&room_id), &user_id, since).unwrap()
events: db
.account_data
.changes_since(Some(&room_id), &user_id, since)
.unwrap()
.into_iter()
.map(|(_, v)| v)
.collect(),
@ -1304,8 +1325,7 @@ pub fn sync_route(
// TODO: state before timeline
state: sync_events::State {
events: if send_full_state {
db
.rooms
db.rooms
.room_state(&room_id)
.unwrap()
.into_iter()
@ -1332,17 +1352,12 @@ pub fn sync_route(
let mut edus = db
.rooms
.edus
.roomlatests_since(&room_id, since).unwrap()
.roomlatests_since(&room_id, since)
.unwrap()
.map(|r| r.unwrap())
.collect::<Vec<_>>();
edus.extend(
db
.rooms
.edus
.roomactives_all(&room_id)
.map(|r| r.unwrap()),
);
edus.extend(db.rooms.edus.roomactives_all(&room_id).map(|r| r.unwrap()));
left_rooms.insert(
room_id.clone().try_into().unwrap(),
@ -1363,7 +1378,8 @@ pub fn sync_route(
let room_id = room_id.unwrap();
let events = db
.rooms
.pdus_since(&room_id, since).unwrap()
.pdus_since(&room_id, since)
.unwrap()
.into_iter()
.map(|pdu| pdu.unwrap().to_stripped_state_event())
.collect();
@ -1383,10 +1399,23 @@ pub fn sync_route(
join: joined_rooms,
invite: invited_rooms,
},
presence: sync_events::Presence { events: Vec::new() },
presence: sync_events::Presence {
events: db
.global_edus
.globallatests_since(since)
.unwrap()
.map(|edu| {
EventJson::<ruma_events::presence::PresenceEvent>::from(
edu.unwrap().json().to_owned(),
)
})
.collect(),
},
account_data: sync_events::AccountData {
events: db.account_data
.changes_since(None, &user_id, since).unwrap()
events: db
.account_data
.changes_since(None, &user_id, since)
.unwrap()
.into_iter()
.map(|(_, v)| v)
.collect(),

View File

@ -1,4 +1,5 @@
pub(self) mod account_data;
pub(self) mod global_edus;
pub(self) mod globals;
pub(self) mod rooms;
pub(self) mod users;
@ -11,8 +12,7 @@ pub struct Database {
pub users: users::Users,
pub rooms: rooms::Rooms,
pub account_data: account_data::AccountData,
//pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count
//pub globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Count + Type + UserId
pub global_edus: global_edus::GlobalEdus,
pub _db: sled::Db,
}
@ -66,8 +66,10 @@ impl Database {
account_data: account_data::AccountData {
roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata").unwrap(),
},
//globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(),
//globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(),
global_edus: global_edus::GlobalEdus {
//globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(),
globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(), // Presence
},
_db: db,
}
}

View File

@ -0,0 +1,68 @@
use crate::Result;
use ruma_events::{collections::only::Event as EduEvent, EventJson};
use ruma_identifiers::UserId;
pub struct GlobalEdus {
pub(super) globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Count + UserId
//pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count
}
impl GlobalEdus {
/// Adds a global event which will be saved until a new event replaces it (e.g. presence updates).
pub fn update_globallatest(
&self,
user_id: &UserId,
event: EduEvent,
globals: &super::globals::Globals,
) -> Result<()> {
// Remove old entry
if let Some(old) = self
.globallatestid_globallatest
.iter()
.keys()
.rev()
.filter_map(|r| r.ok())
.find(|key| {
key.rsplit(|&b| b == 0xff).next().unwrap() == user_id.to_string().as_bytes()
})
{
// This is the old global_latest
self.globallatestid_globallatest.remove(old)?;
}
let mut global_latest_id = globals.next_count()?.to_be_bytes().to_vec();
global_latest_id.push(0xff);
global_latest_id.extend_from_slice(&user_id.to_string().as_bytes());
self.globallatestid_globallatest
.insert(global_latest_id, &*serde_json::to_string(&event)?)?;
Ok(())
}
/// Returns an iterator over the most recent presence updates that happened after the event with id `since`.
pub fn globallatests_since(
&self,
since: u64,
) -> Result<impl Iterator<Item = Result<EventJson<EduEvent>>>> {
let first_possible_edu = since.to_be_bytes().to_vec();
Ok(self
.globallatestid_globallatest
.range(&*first_possible_edu..)
// Skip the first pdu if it's exactly at since, because we sent that last time
.skip(
if self
.globallatestid_globallatest
.get(first_possible_edu)?
.is_some()
{
1
} else {
0
},
)
.filter_map(|r| r.ok())
.map(|(_, v)| Ok(serde_json::from_slice(&v)?)))
}
}

View File

@ -79,14 +79,6 @@ impl RoomEdus {
.map(|(_, v)| Ok(serde_json::from_slice(&v)?)))
}
/// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`.
pub fn roomlatests_all(
&self,
room_id: &RoomId,
) -> Result<impl Iterator<Item = Result<EventJson<EduEvent>>>> {
self.roomlatests_since(room_id, 0)
}
/// Adds an event that will be saved until the `timeout` timestamp (e.g. typing notifications).
pub fn roomactive_add(
&self,

View File

@ -59,7 +59,12 @@ pub async fn send_request<T: Endpoint>(
request_map.insert("destination".to_owned(), destination.into());
let mut request_json = request_map.into();
ruma_signatures::sign_json(db.globals.hostname(), db.globals.keypair(), &mut request_json).unwrap();
ruma_signatures::sign_json(
db.globals.hostname(),
db.globals.keypair(),
&mut request_json,
)
.unwrap();
let signatures = request_json["signatures"]
.as_object()
@ -85,7 +90,11 @@ pub async fn send_request<T: Endpoint>(
);
}
let reqwest_response = db.globals.reqwest_client().execute(http_request.into()).await;
let reqwest_response = db
.globals
.reqwest_client()
.execute(http_request.into())
.await;
// Because reqwest::Response -> http::Response is complicated:
match reqwest_response {